我想只要是互联网公司,难免会和消息队列打交道吧,毕竟作为异步化处理的首选工具,在针对流量突变时的削峰填谷功能,还是非常优秀的。
大部分公司使用RabbitMQ/RocketMQ/Kafka,我这里是针对个人发现企业中经常出现的RabbitMQ滥用/错用情况,进行梳理,顺便提出一些改造建议。
核心结构梳理
网上关于RabbitMQ的各种基础知识已经很多,这里不做赘述,直奔主题,说说比较核心的两个方面。
生产者将消息发送到交换机,发送的时候需要指定路由键(Routing Key)
注意,指定的是路由键,并不是队列名称,生产者端,并不知道有哪些队列
生产中使用问题:经常以为是发送给队列,而且会有convertAndSend(“exchange”, “xxx.xxx.queue”, data)这种用法,中间参数xxx.xxx.queue
但其实该参数是表示的路由键Routing Key
,并不是队列。
消费者生成一个队列,将队列使用一个Key绑定到交换机
注意,这个Key以前叫做绑定键Binding Key
,现在官方统一也叫做路由键Routing Key
,同样,也不是队列名称(对于direct类型交换机,这两倒是一样,其他的交换机,并不一样)
生产中使用问题:同样是搞混队列使用的路由键,类似代码:
return BindingBuilder.bind(addressQueue).to(addressExchange).with(ADDRESS_QUEUE);
这里的ADDRESS_QUEUE
是"xxx.yyy.queue"
字样,这是错误理解,正确应该是"xxx.yyy.key”之类,最起码不带queue
,含义不一样,不利于理解。
这里使用图片阐述一下正确理解:
这里是错误理解:
关键模式交换机使用方式
Fanout交换机–广播机制
Fanout
交换机,直译过来就是扇形交换机,他不关注任何路由键,只要发送到该交换机上的内容,会被发送到每一个绑定到该交换机的队列上,是一个典型的广播机制。
Topic交换机 – 主题订阅
Topic顾名思义就是订阅主题相关,生产者方发送确切的Routing Key
+ 消息到交换机,消费者根据 确切/不确切的Routing Key
来匹配路由
其中:
*
表示一个单词#
表示0个或者多个单词
如果整个Routing Key
就是 #
, 那么表示匹配该交换机全部的消息
错误使用的例子:
上图有两个问题:
1、所有的Routing Key
全是 xxx.queue
这种格式,和队列名一样,明显是开发搞混了
2、有Routing Key
其实对应的都是同一个消息,但是因为开发以为是面向队列发送消息,发送给四个队列,导致出现这种Routing Key
不同但是消息内容完全一样的情况
那么如何改造呢?
下面有一些改造建议
规范
fanout 类型
因为无需考虑绑定建Routing Key,所以需要在交换机Exchange中添加业务描述(下方的business)
生产者:
交换机名字: domain.service.business.exchange
例如central-content.area.area_changed.exchange
反面例子: rma.wms.exchange
(不知道这个属于那个domain)
消费者:
队列名字 : domain.service.queue 例子: central-so.order.queue
路由键名字: 不需要
发送方代码:
xxx.convertAndSend("central-content.area.area_changed.exchange", "", data)
接收方:
绑定队列 central-so.order.queue
到domain.service.exchange
,绑定键为""即可
String queueName = "central-so.order.queue";
String exchangeName = "central-content.area.area_changed.exchange";
@Bean
public Queue refreshOrderQueue(){
return new Queue(queueName);
}
@Bean
public FanoutExchange priceChangedExchange(){
return new FanoutExchange(exchangeName);
}
@Bean
public Binding itemPriceBinding(@Qualifier(exchangeName) FanoutExchange exchange, @Qualifier(queueName)Queue queue){
return BindingBuilder.bind(queue).to(exchange);
}
topic类型
生产者:
交换机名字: domain.exchange
例如central-im.exchange
反面例子: rma.wms.exchange
(不知道这个属于哪个domain)
路由键名字: 生产者方的业务描述 business
例如 price.changed
xxx.convertAndSend("central-im.exchange", "price.changed", data)
消费者:
队列名字 : domain.business.queue
例如 ec-search.priority.queue
路由键名字:按需分配,可以试用* 和 #来做通配 例如 price.changed
或者price.*
或者 price.#
接收方绑定队列 ec-search.priority.queue
到central-im.exchange
,绑定键为price.changed
String queueName = "ec-search.priority.queue";
String exchangeName = "central-im.exchange";
@Bean
public Queue changePriorityQueue(){
return new Queue(queueName);
}
@Bean
public TopicExchange priceChangedExchange(){
return new TopicExchange(exchangeName);
}
@Bean
public Binding itemPriceBinding(@Qualifier(exchangeName) FanoutExchange exchange, @Qualifier(queueName)Queue queue){
return BindingBuilder.bind(queue).to(exchange).with("price.changed");
}
建议
个人比较建议使用Topic
类型交换机,基于领域划分
交换机。但是这就需要对Routing Key
有比较合理的划分,知道如何使用通配符来订阅自己感兴趣的消息。
不过对于不太注重领域划分的地方,使用Fanout也是一个还不错的选择,毕竟就是简单粗暴的广播机制。
不建议使用direct类型的交换机,毕竟我们的消息,很可能日后会有其他消费方。当然,如果你用RabbitMQ
的RPC
功能,就可以保留。
评论区