• 鄭州
您的位置: 法律保 > 綜合 > 詳情

RabbitMq TTL+死信隊(duì)列 延遲消息問(wèn)題記錄

來(lái)源: 騰訊云 2023-02-22 22:59:11

延遲隊(duì)列存儲(chǔ)的對(duì)象是對(duì)應(yīng)的延遲消息,所謂的延遲消息是指當(dāng)消息被發(fā)送以后,并不想讓消費(fèi)者立刻拿到消息,而是等待特定時(shí)間后,消費(fèi)者才能拿到這個(gè)消息進(jìn)行消費(fèi)

利用RabbitMqTTL和死信隊(duì)列 來(lái)實(shí)現(xiàn)延時(shí)消費(fèi)。


(資料圖片僅供參考)

如果設(shè)置的是隊(duì)列統(tǒng)一過(guò)期時(shí)間放到死信隊(duì)列,沒(méi)有什么問(wèn)題。

如果是延時(shí)時(shí)間設(shè)置到每條消息上的。而不是給隊(duì)列的。

實(shí)現(xiàn)方式為消息存活時(shí)間為動(dòng)態(tài)用戶頁(yè)面可配置的。

這就導(dǎo)致了一個(gè)問(wèn)題:

先用一條消息的存活時(shí)間是1天。后面又進(jìn)了一條消息存活時(shí)間是1小時(shí)。

結(jié)果一小時(shí)到了,發(fā)現(xiàn)這條消息并沒(méi)有被轉(zhuǎn)發(fā)到消費(fèi)延時(shí)過(guò)期消息的隊(duì)列。

原因是盡管ttl是設(shè)給每條消息的。但是本質(zhì)上,所有延時(shí)消息都還在一個(gè)隊(duì)列里,對(duì)它過(guò)期時(shí)間的檢測(cè)也是從頭部開(kāi)始的。

它不會(huì)檢測(cè)每一條消息是否過(guò)期。而是順序檢測(cè)。

如果first in的消息過(guò)期時(shí)間很長(zhǎng),會(huì)導(dǎo)致它阻塞后進(jìn)的消息。

不僅無(wú)法實(shí)現(xiàn)真正的過(guò)期時(shí)間。還會(huì)導(dǎo)致,一個(gè)大的過(guò)期時(shí)間的先進(jìn)的消息,會(huì)堆積一堆后進(jìn)的過(guò)期時(shí)間短的消息。

問(wèn)題解決

這個(gè)時(shí)候可以使用rabbitMq的一個(gè)插件:rabbitmq_delayed_message_exchange

一段時(shí)間以來(lái),人們一直在尋找用RabbitMQ實(shí)現(xiàn)延遲消息的傳遞方法,到目前為止,公認(rèn)的解決方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此來(lái)實(shí)現(xiàn)的,RabbitMQ延遲消息插件新增了一種新的交換器類型,消息通過(guò)這種交換器路由就可以實(shí)現(xiàn)延遲發(fā)送

插件安裝

需要根據(jù)自己的rabbitMq選擇對(duì)應(yīng)的版本。我rabbitMq的版本是RabbitMQ 3.11.0,對(duì)應(yīng)的插件版本就是:3.11.1

基于Linux

--1、cd到rabbitmq默認(rèn)安裝位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通過(guò)ftp工具將插件上傳到此目錄下--3、開(kāi)啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重啟MQ服務(wù)systemctl restart rabbitmq-server

基于Docker

--1、通過(guò)ftp工具將插件上傳到Linux服務(wù)器的根目錄下--2、拷貝到docker中rabbitmq插件目錄下,rabbitmq_delayed_message_exchange-3.9.0.ez(下載包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、進(jìn)入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(確保2中的操作已經(jīng)將插件拷貝過(guò)來(lái)了)cd pluginsls |grep delay--5、開(kāi)啟插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重啟MQ服務(wù)docker restart 容器ID

安裝成功

web界面新建交換機(jī)選擇類型出現(xiàn)紅框標(biāo)注即表示成功

image.png

代碼實(shí)現(xiàn)

1:springBoot配置

@Configurationpublic class DelayRabbitmqConfig {     /**     * 聲明延遲隊(duì)列     * @return     */    @Bean    public Queue delayQueue(){        return new Queue(QueueConstant.DelayQueue,                true,false,false);    }     /**     * 聲明延遲自定義交換機(jī)類型     * @return     */    @Bean    public CustomExchange delayCustomExchange(){        HashMap args = new HashMap<>();//        設(shè)置 x-delayed-type 為 direct,當(dāng)然也可以是 topic 等 發(fā)送消息時(shí)設(shè)置消息頭 headers 的 x-delay 屬性,即延遲時(shí)間,如果不設(shè)置消息將會(huì)立即投遞        args.put("x-delayed-type","direct");        return new CustomExchange(ExchangeConstant.DelayCustomerExchange,                "x-delayed-message",true,false,args);    }     /**     * 綁定延遲交換機(jī)和隊(duì)列     * @return     */    @Bean    public Binding delayQueueAndCustomExchange(){        return BindingBuilder.bind(delayQueue())                .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs();    }}

springMvc配置

引入依賴:    xmlns:util="http://www.springframework.org/schema/util"    http://www.springframework.org/schema/util    http://www.springframework.org/schema/util/spring-util-4.0.xsd                                                                                                                

代碼實(shí)現(xiàn)

//消息發(fā)送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每條消息時(shí)間配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延遲消息處理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor {    /**     * 消息延遲時(shí)間,單位:毫秒     */    private final Integer TTL;    public MyMessagePostProcessor(final Integer ttl) {        this.TTL = ttl;    }    @Override    public Message postProcessMessage(Message message) throws AmqpException {        message.getMessageProperties().setDelay(TTL);        return message;    }}
標(biāo)簽: RabbitMQ
溫馨提示:

在實(shí)際法律問(wèn)題情景中,個(gè)案情況都有所差異,為了高效解決您的問(wèn)題,保障合法權(quán)益,建議您直接向?qū)I(yè)律師說(shuō)明情況,解決您的實(shí)際問(wèn)題。 立即在線咨詢 >

相關(guān)知識(shí)推薦
操作
分享
15037178970

公眾服務(wù)

法制網(wǎng)公眾號(hào)

快速找律師 / 免費(fèi)咨詢

查法律知識(shí) / 查看解答 / 隨時(shí)追問(wèn)

律師服務(wù)(工作日8:30-18:00 ,非工作日請(qǐng)QQ留言)

律師加盟

律師營(yíng)銷服務(wù)

在線客服:

加盟熱線:

律師營(yíng)銷診斷

營(yíng)銷分析 / 回復(fù)咨詢

案件接洽 / 合作加盟

法律保,中國(guó)知名的 法律咨詢網(wǎng)站,能夠?yàn)閺V大用戶提供在線 免費(fèi)法律咨詢服務(wù)。
CopyRight@2003-2023 falvbao.net.cn ALL Rights Reservrd 版權(quán)所有
皖I(lǐng)CP備2022009963號(hào)-45
違法和不良信息聯(lián)系郵箱:39 60 29 14 2 @qq.com