侧边栏壁纸
博主头像
翻斗

开始一件事最好是昨天,其次是现在

  • 累计撰写 44 篇文章
  • 累计创建 42 个标签
  • 累计收到 3 条评论

一些RabbitMQ的使用问题和改造建议

翻斗
2022-12-10 / 0 评论 / 1 点赞 / 1,350 阅读 / 3,037 字

我想只要是互联网公司,难免会和消息队列打交道吧,毕竟作为异步化处理的首选工具,在针对流量突变时的削峰填谷功能,还是非常优秀的。

大部分公司使用RabbitMQ/RocketMQ/Kafka,我这里是针对个人发现企业中经常出现的RabbitMQ滥用/错用情况,进行梳理,顺便提出一些改造建议。

核心结构梳理

网上关于RabbitMQ的各种基础知识已经很多,这里不做赘述,直奔主题,说说比较核心的两个方面。

生产者将消息发送到交换机,发送的时候需要指定路由键(Routing Key)

注意,指定的是路由键,并不是队列名称,生产者端,并不知道有哪些队列

生产中使用问题:经常以为是发送给队列,而且会有convertAndSend(“exchange”, “xxx.xxx.queue”, data)这种用法,中间参数xxx.xxx.queue但其实该参数是表示的路由键Routing Key,并不是队列。

消费者生成一个队列,将队列使用一个Key绑定到交换机

注意,这个Key以前叫做绑定键Binding Key,现在官方统一也叫做路由键Routing Key,同样,也不是队列名称(对于direct类型交换机,这两倒是一样,其他的交换机,并不一样)

生产中使用问题:同样是搞混队列使用的路由键,类似代码:
return BindingBuilder.bind(addressQueue).to(addressExchange).with(ADDRESS_QUEUE); 这里的ADDRESS_QUEUE"xxx.yyy.queue"字样,这是错误理解,正确应该是"xxx.yyy.key”之类,最起码不带queue,含义不一样,不利于理解。

这里使用图片阐述一下正确理解:

这里是错误理解:

关键模式交换机使用方式

Fanout交换机–广播机制

Fanout交换机,直译过来就是扇形交换机,他不关注任何路由键,只要发送到该交换机上的内容,会被发送到每一个绑定到该交换机的队列上,是一个典型的广播机制。

Topic交换机 – 主题订阅

Topic顾名思义就是订阅主题相关,生产者方发送确切的Routing Key + 消息到交换机,消费者根据 确切/不确切的Routing Key来匹配路由

其中:

  • *表示一个单词
  • #表示0个或者多个单词

如果整个Routing Key就是 #, 那么表示匹配该交换机全部的消息

错误使用的例子:

上图有两个问题:
1、所有的Routing Key 全是 xxx.queue这种格式,和队列名一样,明显是开发搞混了
2、有Routing Key其实对应的都是同一个消息,但是因为开发以为是面向队列发送消息,发送给四个队列,导致出现这种Routing Key 不同但是消息内容完全一样的情况

那么如何改造呢?

下面有一些改造建议

规范

fanout 类型

因为无需考虑绑定建Routing Key,所以需要在交换机Exchange中添加业务描述(下方的business)

生产者:

交换机名字: domain.service.business.exchange 例如central-content.area.area_changed.exchange
反面例子: rma.wms.exchange (不知道这个属于那个domain)

消费者:
队列名字 : domain.service.queue 例子: central-so.order.queue

路由键名字: 不需要

发送方代码:

     xxx.convertAndSend("central-content.area.area_changed.exchange", "", data)

接收方:

绑定队列 central-so.order.queuedomain.service.exchange ,绑定键为""即可

String queueName = "central-so.order.queue";
String exchangeName = "central-content.area.area_changed.exchange";
@Bean
public Queue refreshOrderQueue(){
    return new Queue(queueName);
}
 
@Bean
public FanoutExchange priceChangedExchange(){
    return new FanoutExchange(exchangeName);
}
 
@Bean
public Binding itemPriceBinding(@Qualifier(exchangeName) FanoutExchange exchange, @Qualifier(queueName)Queue queue){
    return BindingBuilder.bind(queue).to(exchange);
}

topic类型

生产者:

交换机名字: domain.exchange 例如central-im.exchange 反面例子: rma.wms.exchange (不知道这个属于哪个domain)

路由键名字: 生产者方的业务描述 business 例如 price.changed

xxx.convertAndSend("central-im.exchange", "price.changed", data)

消费者:

队列名字 : domain.business.queue 例如 ec-search.priority.queue

路由键名字:按需分配,可以试用* 和 #来做通配 例如 price.changed 或者price.* 或者 price.#

接收方绑定队列 ec-search.priority.queuecentral-im.exchange ,绑定键为price.changed

String queueName = "ec-search.priority.queue";
String exchangeName = "central-im.exchange";
@Bean
public Queue changePriorityQueue(){
    return new Queue(queueName);
}
 
@Bean
public TopicExchange priceChangedExchange(){
    return new TopicExchange(exchangeName);
}
 
@Bean
public Binding itemPriceBinding(@Qualifier(exchangeName) FanoutExchange exchange, @Qualifier(queueName)Queue queue){
    return BindingBuilder.bind(queue).to(exchange).with("price.changed");
}

建议

个人比较建议使用Topic类型交换机,基于领域划分交换机。但是这就需要对Routing Key有比较合理的划分,知道如何使用通配符来订阅自己感兴趣的消息。

不过对于不太注重领域划分的地方,使用Fanout也是一个还不错的选择,毕竟就是简单粗暴的广播机制。

不建议使用direct类型的交换机,毕竟我们的消息,很可能日后会有其他消费方。当然,如果你用RabbitMQRPC功能,就可以保留。

1

评论区