這是一個(gè)基于消息的分布式事務(wù)的一部分,主要通過消息來實(shí)現(xiàn),生產(chǎn)者把消息發(fā)到隊(duì)列后,由消費(fèi)方去執(zhí)行剩下的邏輯,而當(dāng)消費(fèi)方處理失敗后,我們需要進(jìn)行重試,即為了最現(xiàn)數(shù)據(jù)的最終一致性,在rabbitmq里,它有消息重試和重試次數(shù)的配置,但當(dāng)你配置之后,你的TTL達(dá)到
后,消息不能自動(dòng)放入死信隊(duì)列,所以這塊需要手工處理一下.
rabbitmq關(guān)于消息重試的配置
rabbitmq: host: xxx port: xxx username: xxx password: xxx virtual-host: xxx
###開啟消息確認(rèn)機(jī)制 confirms publisher-confirms: true publisher-returns: true listener:
simple: acknowledge-mode: manual #設(shè)置確認(rèn)方式 prefetch: 1 #每次處理1條消息
retry.max-attempts: 3 # 最大重試次數(shù) retry.enabled: true
#是否開啟消費(fèi)者重試(為false時(shí)關(guān)閉消費(fèi)者重試,這時(shí)消費(fèi)端代碼異常會(huì)一直重復(fù)收到消息) retry.initial-interval: 2000
#重試間隔時(shí)間(單位毫秒) default-requeue-rejected: true
#該配置項(xiàng)是決定由于監(jiān)聽器拋出異常而拒絕的消息是否被重新放回隊(duì)列。默認(rèn)值為true,需要手動(dòng)basicNack時(shí)這些參數(shù)諒失效了
手工實(shí)現(xiàn)消息重試并放入死信的方式
定義隊(duì)列的相關(guān)配置
/** * 創(chuàng)建普通交換機(jī). */ @Bean public TopicExchange lindExchange() { //消息持久化 return
(TopicExchange) ExchangeBuilder.topicExchange(EXCHANGE).durable(true).build();
} @Bean public TopicExchange deadExchange() { return (TopicExchange)
ExchangeBuilder.topicExchange(LIND_DL_EXCHANGE).durable(true).build(); } /** *
基于消息事務(wù)的處理方式,當(dāng)消費(fèi)失敗進(jìn)行重試,有時(shí)間間隔,當(dāng)達(dá)到超時(shí)時(shí)間,就發(fā)到死信隊(duì)列,等待人工處理. * @return */ @Bean public
Queue testQueue() { //設(shè)置死信交換機(jī) return
QueueBuilder.durable(QUEUE).withArgument("x-dead-letter-exchange",
LIND_DL_EXCHANGE) //毫秒 .withArgument("x-message-ttl", CONSUMER_EXPIRE)
//設(shè)置死信routingKey .withArgument("x-dead-letter-routing-key",
LIND_DEAD_QUEUE).build(); } @Bean public Queue deadQueue() { return new
Queue(LIND_DEAD_QUEUE); } @Bean public Binding bindBuildersRouteKey() { return
BindingBuilder.bind(testQueue()).to(lindExchange()).with(ROUTER); } @Bean
public Binding bindDeadBuildersRouteKey() { return
BindingBuilder.bind(deadQueue()).to(deadExchange()).with(LIND_DEAD_QUEUE); }
消費(fèi)者實(shí)現(xiàn)的代碼
/** * 延時(shí)隊(duì)列:不應(yīng)該有RabbitListener訂閱者,應(yīng)該讓它自己達(dá)到超時(shí)時(shí)間后自動(dòng)轉(zhuǎn)到死信里去消費(fèi) *
消息異常處理:消費(fèi)出現(xiàn)異常后,延時(shí)幾秒,然后從新入隊(duì)列消費(fèi),直到達(dá)到TTL超時(shí)時(shí)間,再轉(zhuǎn)到死信,證明這個(gè)信息有問題需要人工干預(yù) * * @param
message */ @RabbitListener(queues = MqConfig.QUEUE) public void
testSubscribe(Message message, Channel channel) throws IOException,
InterruptedException { try { System.out.println(LocalDateTime.now() +
":Subscriber:" + new String(message.getBody(), "UTF-8"));
//當(dāng)程序處理出現(xiàn)問題時(shí),消息使用basicReject上報(bào) int a = 0; int b = 1 / a;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
catch (Exception ex) { //出現(xiàn)異常手動(dòng)放回隊(duì)列 Thread.sleep(2000);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,
true); } } /** * 死信隊(duì)列. * * @param message */ @RabbitListener(queues =
MqConfig.LIND_DEAD_QUEUE) public void dealSubscribe(Message message, Channel
channel) throws IOException { System.out.println("Dead Subscriber:" + new
String(message.getBody(), "UTF-8"));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
消費(fèi)者這塊,也可以直接聲明隊(duì)列和綁定交換機(jī),直接在注解上添加 QueueBinding即可.
@RabbitListener(bindings = {@QueueBinding(value = @Queue( name =
MqConfig.QUEUE, durable = "true",arguments = {@Argument(name =
"x-dead-letter-exchange", value = MqConfig.LIND_DL_EXCHANGE), @Argument(name =
"x-message-ttl", value = MqConfig.CONSUMER_EXPIRE,type="java.lang.Long"),
@Argument(name = "x-dead-letter-routing-key", value =
MqConfig.LIND_DEAD_QUEUE)}), exchange = @Exchange(value = MqConfig.EXCHANGE,
durable = "true",type="topic") )}) public void testSubscribe(Message message,
Channel channel) throws IOException, InterruptedException { }
這邊嘗試讓消費(fèi)者執(zhí)行出錯(cuò),然后走到catch里使用basicNack方法把消息從新放里隊(duì)列里,并讓線程讓休息2秒,以避免頻繁操作,之后就是我們希望看到的代碼
2019-12-20T17:21:31.190:Subscriber:send a message to mq
2019-12-20T17:21:33.200:Subscriber:send a message to mq
2019-12-20T17:21:35.206:Subscriber:send a message to mq
2019-12-20T17:21:37.213:Subscriber:send a message to mq
2019-12-20T17:21:39.221:Subscriber:send a message to mq Dead Subscriber:send a
message to mq
這就是一個(gè)消息隊(duì)列的補(bǔ)償機(jī)制,使用死信隊(duì)列也可以實(shí)現(xiàn)延時(shí)消息的機(jī)制,有時(shí)間再給大家分享!
熱門工具 換一換