前言
前面我們介紹了RabbitMQ的安裝、各大消息中間件的對比、AMQP核心概念、管控臺的使用、快速入門RabbitMQ。本章將介紹RabbitMQ的高級特性。分兩篇(上/下)進(jìn)行介紹。
* 消息如何保障100%的投遞成功?
* 冪等性概念詳解
* 在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)的問題?
* Confirm確認(rèn)消息、Return返回消息
1 消息如何保障100%的投遞成功?
1.1 什么是生產(chǎn)端的可靠性投遞?
* 保障消息的成功發(fā)出
* 保障MQ節(jié)點(diǎn)的成功接收
* 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)確認(rèn)應(yīng)答
* 完善的消息進(jìn)行補(bǔ)償機(jī)制
前三步不一定能保障消息能夠100%投遞成功。因此要加上第四步
BAT/TMD 互聯(lián)網(wǎng)大廠的解決方案:
- 消息落庫,對消息狀態(tài)進(jìn)行打標(biāo)
在發(fā)送消息的時(shí)候,需要將消息持久化到數(shù)據(jù)庫中,并給這個(gè)消息設(shè)置一個(gè)狀態(tài)(未發(fā)送、發(fā)送中、到達(dá))。當(dāng)消息狀態(tài)發(fā)生了變化,需要對消息做一個(gè)變更。針對沒有到達(dá)的消息做一個(gè)輪訓(xùn)操作,重新發(fā)送。對輪訓(xùn)次數(shù)也需要做一個(gè)限制3-5次。確保消息能夠成功的發(fā)送.
* 消息的延遲投遞,做二次確認(rèn),回調(diào)檢查
具體采用哪種方案,還需要根據(jù)業(yè)務(wù)與消息的并發(fā)量而定。
1.2 第一種方案:
生產(chǎn)端-可靠性投遞
圖解:
藍(lán)色部分表示:生產(chǎn)者負(fù)責(zé)發(fā)送消息發(fā)送至Broker端
Biz DB:訂單數(shù)據(jù)庫 MSG DB: 消息數(shù)據(jù)
面對小規(guī)模的應(yīng)用可以采用加事務(wù)的方式,保證事務(wù)的一致性。但在大廠中面對高并發(fā),并沒有加事務(wù),事務(wù)的性能拼接非常嚴(yán)重,而是做補(bǔ)償。
比如:如下發(fā)一條訂單消息。
step1:存儲訂單消息(創(chuàng)建訂單),業(yè)務(wù)數(shù)據(jù)入庫,消息也入庫。缺點(diǎn):需要持久化兩次。(status:0)
step2:在step1成功的前提下,發(fā)送消息
step3:Broker收到消息后,confirm給我們的生產(chǎn)端。Confirm Listener異步監(jiān)聽Broker回送的消息。
step4:抓取出指定的消息,更新(status=1),表示消息已經(jīng)投遞成功。
step5:分布式定時(shí)任務(wù)獲取消息狀態(tài),如果等于0則抓取數(shù)據(jù)出來。
step6:重新發(fā)送消息
step7:重試限制設(shè)置3次。如果消息重試了3次還是失敗,那么(status=2),認(rèn)為這個(gè)消息就是失敗的。
查詢這些消息為什么失敗,可能需要人工去查詢。
假設(shè)step2執(zhí)行成功,step3由于網(wǎng)絡(luò)閃斷。那么confirm將永遠(yuǎn)收不到消息,那么我們需要設(shè)定一個(gè)規(guī)則:
例如:在消息入庫的時(shí)候,設(shè)置一個(gè)臨界值 timeout=5min,當(dāng)超過5min之后,就將這條數(shù)據(jù)抓取出來。
或者寫一個(gè)定時(shí)任務(wù)每隔5分鐘就將status=0的消息抓取出來??赡艽嬖谛栴}:消息發(fā)送出去,定時(shí)任務(wù)又正好剛執(zhí)行,Confirm還未收到,定時(shí)任務(wù)就會執(zhí)行,會導(dǎo)致消息執(zhí)行兩次。
更精細(xì)化操作:消息超時(shí)容忍限制。confirm在2-3分鐘內(nèi)未收到消息,則重新發(fā)送。
* 保障MQ我們思考如果第一種可靠性投遞,在高并發(fā)的場景下是否合適?
第一種方案對數(shù)據(jù)有兩次入庫,一次業(yè)務(wù)數(shù)據(jù)入庫,一次消息入庫。這樣對數(shù)據(jù)的入庫是一個(gè)瓶頸。
其實(shí)我們只需要對業(yè)務(wù)進(jìn)行入庫。
* 消息的延遲投遞,做二次確認(rèn),回調(diào)檢查
這種方式并不一定能保證100%成功,但是也能保證99.99%的消息成功。如果遇到特別極端的情況,那么就只能需要人工去補(bǔ)償,或者定時(shí)任務(wù)去做。
第二種方式主要是為了減少對數(shù)據(jù)庫的操作。
看下第二種方式:
圖解:
Upstream service:生產(chǎn)端
DownStream service:消費(fèi)端
Callback service:回調(diào)服務(wù)
step1:業(yè)務(wù)消息入庫成功后,第一次消息發(fā)送。
step2:同樣在消息入庫成功后,發(fā)送第二次消息,這兩條消息是同時(shí)發(fā)送的。第二條消息是延遲檢查,可以設(shè)置2min、5min 延遲發(fā)送。
step3:消費(fèi)端監(jiān)聽指定隊(duì)列。
step4:消費(fèi)端處理完消息后,內(nèi)部生成新的消息send confirm。投遞到MQ Broker。
step5: Callback Service 回調(diào)服務(wù)監(jiān)聽MQ Broker,如果收到Downstream
service發(fā)送的消息,則可以確定消息發(fā)送成功,執(zhí)行消息存儲到MSG DB。
step6:Check Detail檢查監(jiān)聽step2延遲投遞的消息。此時(shí)兩個(gè)監(jiān)聽的隊(duì)列不是同一個(gè),5分鐘后,Callback
service收到消息,檢查MSG DB。如果發(fā)現(xiàn)之前的消息已經(jīng)投遞成功,則不需要做其他事情。如果檢查發(fā)現(xiàn)失敗,則Callback 進(jìn)行補(bǔ)償,主動發(fā)送RPC
通信。通知上游生產(chǎn)端重新發(fā)送消息。
這樣做的目的:少做了一次DB存儲。關(guān)注點(diǎn)并不是百分百的投遞成功,而是性能。
2. 冪等性概念
2.1 冪等性是什么?
冪等(idempotent、idempotence)是一個(gè)數(shù)學(xué)與計(jì)算機(jī)學(xué)概念,常見于抽象代數(shù)中,即f(f(x)) = f(x)。簡單的來說就是
一個(gè)操作多次執(zhí)行產(chǎn)生的結(jié)果與一次執(zhí)行產(chǎn)生的結(jié)果一致。
* 我們可以借鑒數(shù)據(jù)庫的樂觀鎖機(jī)制:
* 比如我們執(zhí)行一條更新庫存的SQL語句:
* UPDATE T_REPS SET COUNT = COUNT - 1,VERSION = VERSION + 1 WHERE VERSION = 1
利用加版本號Version的方式來保證冪等性。
推薦文章:面試必備的數(shù)據(jù)庫悲觀鎖與樂觀鎖
<https://mp.weixin.qq.com/s?__biz=MzIwMTg3NzYyOA==&mid=2247483733&idx=1&sn=1f066b1446a0a132af8648481063c021&chksm=96e67046a191f9508a133f6c37f2420140b6ca092eaf39012e6fbfa86874fbb57edef6d66b4e&token=1187527588&lang=zh_CN#rd>
2.2 消費(fèi)端-冪等性保障
在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)問題?
在高并發(fā)的情況下,會有大量的消息到達(dá)MQ,消費(fèi)端需要監(jiān)聽大量的消息。這樣的情況下,難免會出現(xiàn)消息的重復(fù)投遞,網(wǎng)絡(luò)閃斷等等。如果不去做冪等,則會出現(xiàn)消息的重復(fù)消費(fèi)。
-消費(fèi)端實(shí)現(xiàn)冪等性,就意味著,我們的消息永遠(yuǎn)不會被消費(fèi)多次,即使我們收到了多條一樣的消息,也只會執(zhí)行一次。
看下互聯(lián)網(wǎng)大廠主流的冪等性操作:
-唯一ID+指紋嗎機(jī)制,利用數(shù)據(jù)庫主鍵去重。
-利用Redis的原子性實(shí)現(xiàn)
-其他的技術(shù)實(shí)現(xiàn)冪等性
2.2.1 唯一ID+指紋碼機(jī)制
* 唯一ID + 指紋嗎機(jī)制,利用數(shù)據(jù)庫主鍵去重。
保證唯一性
* SELECT COUNT(1) FROM T_ORDER WHERE ID = 唯一ID + 指紋碼
如果查詢沒有,則添加。有則不需要做任何操作,消費(fèi)端不需要消費(fèi)消息。
* 好處:實(shí)現(xiàn)簡單
* 壞處:高并發(fā)下有數(shù)據(jù)庫寫入的性能瓶頸
* 解決方案:跟進(jìn)ID進(jìn)行分庫分表進(jìn)行算法路由
分?jǐn)偭髁繅毫Α?
2.2.2 Redis 原子特性實(shí)現(xiàn)
最簡單使用Redis的自增。
* 使用Redis進(jìn)行冪等,需要考慮的問題。
* 第一:我們是否需要數(shù)據(jù)落庫,如果落庫的話,關(guān)鍵解決的問題是數(shù)據(jù)庫和緩存如何做到原子性?
加事務(wù)不行,Redis和數(shù)據(jù)庫的事務(wù)不是同一個(gè),無法保證同時(shí)成功同時(shí)失敗。大家有什么更好的方案呢?
* 第二:如果不進(jìn)行落庫,那么都存儲到緩存中,如何設(shè)置定時(shí)同步的策略?
怎么做到緩存數(shù)據(jù)的穩(wěn)定性?
3. Confirm 確認(rèn)消息
理解Confirm 消息確認(rèn)機(jī)制:
* 消息的確認(rèn),是指生產(chǎn)者投遞消息后,如果Broker收到消息,則會給我們生產(chǎn)者一個(gè)應(yīng)答。
* 生產(chǎn)者進(jìn)行接收應(yīng)答,用來確定這條消息是否正常的發(fā)送到Broker,這種方式也是消息的可靠性投遞的核心保障!
藍(lán)色:producer 生產(chǎn)者 紅色:MQ Broker 服務(wù)器
生產(chǎn)者把消息發(fā)送到Broker端,Broker收到消息之后回送給producer。Confirm Listener 監(jiān)聽?wèi)?yīng)答。
操作是異步操作,當(dāng)生產(chǎn)者發(fā)送完消息之后,就不需要管了。Confirm Listener 監(jiān)聽MQ Broker的應(yīng)答。
3.1 如何實(shí)現(xiàn)Confirm確認(rèn)消息?
第一步:在channel上開啟確認(rèn)模式:channel.confirmSelect()
第二步;在chanel上 添加監(jiān)聽:addConfirmListener,監(jiān)聽成功和失敗的返回結(jié)果,根據(jù)具體的結(jié)果對消息進(jìn)行重新發(fā)送、或記錄日志等后續(xù)處理!
3.2 代碼編寫:
生產(chǎn)者:
/** * * @ClassName: Producer * @Description: 生產(chǎn)者 * @author Coder編程 * @date
2019年7月30日 上午21:27:02 * */ public class Producer { public static void
main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory Connection
connection = ConnectionUtils.getConnection(); //2 通過Connection創(chuàng)建一個(gè)新的Channel
Channel channel = connection.createChannel(); //3 指定我們的消息投遞模式: 消息的確認(rèn)模式
channel.confirmSelect(); String exchangeName = "test_confirm_exchange"; String
routingKey = "confirm.save"; //4 發(fā)送一條消息 String msg = "Hello RabbitMQ Send
confirm message!"; channel.basicPublish(exchangeName, routingKey, null,
msg.getBytes()); //5 添加一個(gè)確認(rèn)監(jiān)聽 用于發(fā)送消息到Broker端之后,回送消息的監(jiān)聽
channel.addConfirmListener(new ConfirmListener() { @Override public void
handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------"); } @Override public void
handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------"); } }); } }
消費(fèi)者:
/** * * @ClassName: Consumer * @Description: 消費(fèi)者 * @author Coder編程 * @date
2019年7月30日 上午21:32:02 * */ public class Consumer { public static void
main(String[] args) throws Exception { //1 獲取一個(gè)連接 Connection connection =
ConnectionUtils.getConnection(); //2通過Connection創(chuàng)建一個(gè)新的Channel Channel channel =
connection.createChannel(); String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#"; String queueName = "test_confirm_queue"; //3
聲明交換機(jī)和隊(duì)列 然后進(jìn)行綁定設(shè)置, 最后制定路由Key channel.exchangeDeclare(exchangeName, "topic",
true); channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey); //4 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery
delivery = queueingConsumer.nextDelivery(); String msg = new
String(delivery.getBody()); System.err.println("消費(fèi)端: " + msg); } } }
工具類:
/** * * @ClassName: ConnectionUtils * @Description: 連接工具類 * @author Coder編程 *
@date 2019年6月21日 上午22:28:22 * */ public class ConnectionUtils { public static
Connection getConnection() throws IOException, TimeoutException { //定義連接工廠
ConnectionFactory factory = new ConnectionFactory(); //設(shè)置服務(wù)地址
factory.setHost("127.0.0.1"); //端口 factory.setPort(5672);//amqp協(xié)議 端口
類似與mysql的3306 //設(shè)置賬號信息,用戶名、密碼、vhost factory.setVirtualHost("/vhost_cp");
factory.setUsername("user_cp"); factory.setPassword("123456"); // 通過工程獲取連接
Connection connection = factory.newConnection(); return connection; } }
先啟動消費(fèi)端=》再啟動生產(chǎn)端
3.3 查看管控臺:
3.4 打印結(jié)果:
可以觀察到消費(fèi)端先接收到消息,之后生產(chǎn)端再接收到回調(diào)信息。如果出現(xiàn)磁盤已滿、RabbitMQ出現(xiàn)異常、queue容量到達(dá)上限都可能接收到no ack
如果ack和no ack消息都未接收到,這就是之前所說的。RabbitMQ出現(xiàn)網(wǎng)絡(luò)閃斷,可以采用上面所說的消息補(bǔ)償。
4. Return消息機(jī)制
* Return Listener用于處理一些不可路由的消息!
* 我們的消息生產(chǎn)者,通過指定一個(gè)Exchange和Routingkey,把消息送達(dá)到某一個(gè)隊(duì)列中去,然后我們的消費(fèi)者監(jiān)聽隊(duì)列,進(jìn)行消費(fèi)處理操作!
*
但是在某些情況下,如果我們在發(fā)送消息的時(shí)候,當(dāng)前的exchange不存在或者指定的路由key路由不到,這個(gè)時(shí)候如果我們需要監(jiān)聽這種不可達(dá)的消息,就要使用Return
Listener!
在基礎(chǔ)API中有一個(gè)關(guān)鍵的配置項(xiàng):
* Mandatory:如果為true,則監(jiān)聽器會接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)處理,如果為false,那么broker端自動刪除該消息!
4.1 Return消息機(jī)制流程
Producer生產(chǎn)端將消息發(fā)送到MQ Broker端,但是出現(xiàn)NotFind
Exchange,發(fā)送的消息的Exchange,在Broker端未能找到?;蛘哒业搅?,但是路由key路由不到指定的隊(duì)列。因此是一個(gè)錯(cuò)誤的消息。
這個(gè)時(shí)候,生產(chǎn)端應(yīng)該知道發(fā)送的這條消息,并不會被處理。因此MQ
Broker提供了這種Return機(jī)制,將這些不可達(dá)的消息發(fā)送給生產(chǎn)端,這時(shí)候生產(chǎn)端就需要設(shè)置Return
Listener去接收這些不可達(dá)的消息。然后及時(shí)記錄日志,去處理這些消息。
4.2 代碼演示
生產(chǎn)者:
/** * * @ClassName: Producer * @Description: 生產(chǎn)者 * @author Coder編程 * @date
2019年7月30日 上午22:03:22 * */ public class Producer { public static void
main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory Connection
connection = ConnectionUtils.getConnection(); Channel channel =
connection.createChannel(); String exchange = "test_return_exchange"; String
routingKey = "return.save"; String routingKeyError = "abc.save"; String msg =
"Hello RabbitMQ Return Message"; channel.addReturnListener(new ReturnListener()
{ @Override public void handleReturn(int replyCode, String replyText, String
exchange, String routingKey, AMQP.BasicProperties properties, byte[] body)
throws IOException { System.err.println("---------handle return----------");
//響應(yīng)碼 System.err.println("replyCode: " + replyCode); //響應(yīng)文本
System.err.println("replyText: " + replyText); System.err.println("exchange: "
+ exchange); System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties); System.err.println("body: " +
new String(body)); } }); //第三個(gè)參數(shù)mandatory=true,意味著路由不到的話mq也不會刪除消息,false則會自動刪除
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
//修改routingkey,測試是否能夠收到消息 //channel.basicPublish(exchange, routingKeyError,
true, null, msg.getBytes()); } }
消費(fèi)者:
/** * * @ClassName: Consumer * @Description: 消費(fèi)者 * @author Coder編程 * @date
2019年7月30日 上午22:33:34 * */ public class Consumer { public static void
main(String[] args) throws Exception { //1 創(chuàng)建ConnectionFactory Connection
connection = ConnectionUtils.getConnection(); Channel channel =
connection.createChannel(); String exchangeName = "test_return_exchange";
String routingKey = "return.#"; String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey); QueueingConsumer
queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer); while(true){ Delivery
delivery = queueingConsumer.nextDelivery(); String msg = new
String(delivery.getBody()); System.err.println("消費(fèi)者: " + msg); } } }
ConnectionUtils 工具代碼在上面。
啟動消費(fèi)端,并查看管控臺。
4.3 查看管控臺
4.4 查看打印結(jié)果
放開消費(fèi)端代碼:channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
消費(fèi)端打印結(jié)果:
可以看到打印結(jié)果正常,此時(shí)再改代碼為:
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
可以看到生產(chǎn)端接收到了不可達(dá)的消息。
文末
歡迎關(guān)注個(gè)人微信公眾號:Coder編程
獲取最新原創(chuàng)技術(shù)文章和免費(fèi)學(xué)習(xí)資料,更有大量精品思維導(dǎo)圖、面試資料、PMP備考資料等你來領(lǐng),方便你隨時(shí)隨地學(xué)習(xí)技術(shù)知識!
新建了一個(gè)qq群:315211365,歡迎大家進(jìn)群交流一起學(xué)習(xí)。謝謝了!也可以介紹給身邊有需要的朋友。
文章收錄至
Github: https://github.com/CoderMerlin/coder-programming
Gitee: https://gitee.com/573059382/coder-programming
歡迎關(guān)注并star~
參考文章:
《RabbitMQ消息中間件精講》
推薦文章:
消息中間件——RabbitMQ(四)命令行與管控臺的基本操作!
<https://mp.weixin.qq.com/s?__biz=MzIwMTg3NzYyOA==&mid=2247483858&idx=1&sn=2855220277c7c0ba4c1eea7824ea1684&chksm=96e670c1a191f9d7044c274552313fd46346f9192299a36841cd29c5294546335e3706d6968d&token=1195735466&lang=zh_CN#rd>
消息中間件——RabbitMQ(五)快速入門生產(chǎn)者與消費(fèi)者,SpringBoot整合RabbitMQ!
<https://mp.weixin.qq.com/s?__biz=MzIwMTg3NzYyOA==&mid=2247483875&idx=1&sn=0192f99cf8b0c1123f03ca4b392266e7&chksm=96e670f0a191f9e618dc4b143ec62698d945525348593794bfbaf567283fb3a1b37cfe9a9624&token=740701108&lang=zh_CN#rd>
消息中間件——RabbitMQ(六)理解Exchange交換機(jī)核心概念!
<https://mp.weixin.qq.com/s?__biz=MzIwMTg3NzYyOA==&mid=2247483879&idx=1&sn=b3f89d2cb50271727b27e04a315c0a0f&chksm=96e670f4a191f9e208248fd6006926a3a7c45209646e871e8115180ee45b7ebd4e287862a456&token=1187527588&lang=zh_CN#rd>
熱門工具 換一換
感谢您访问我们的网站,您可能还对以下资源感兴趣:
调教肉文小说-国产成本人片免费av-空姐av种子无码-在线观看免费午夜视频-综合久久精品激情-国产成人丝袜视频在线观看软件-大芭区三区四区无码-啊啊好爽啊啊插啊用力啊啊-wanch视频网-国产精品成人a免费观看