1. 引言
事務(wù)大家都知道,就是相當(dāng)于一個原子操作,要么全部執(zhí)行,要么發(fā)生異常全部回滾。但事務(wù)只限于本地事務(wù),即各個數(shù)據(jù)庫操作必須在同一數(shù)據(jù)庫下執(zhí)行。拿我最近的接手的項目來說,各個模塊全部部署于不同的服務(wù)器,都有自己獨立的數(shù)據(jù)庫。前端想要刪除一個用戶,先調(diào)用用戶平臺的刪除用戶接口,再調(diào)用權(quán)限平臺的刪除權(quán)限接口。起初覺得這樣操作沒什么問題,后來有幾次數(shù)據(jù)異常后,發(fā)現(xiàn)有的用戶信息沒有,但權(quán)限信息還存在,導(dǎo)致數(shù)據(jù)不一致。此時,就想到了用分布式事物來解決。所謂分布式事物,我個人理解是為了解決數(shù)據(jù)一致性的問題。
2. kafka+本地事物表解決分布式事務(wù)
消息隊列的產(chǎn)生是為了解決各系統(tǒng)間通信問題,因為Kafka用的比較多,此處就想到用Kafka+本地事物表去解決分布式事務(wù)問題。關(guān)于Kafka+zookeeper的搭建此處不做詳解。
?
上圖是自己基于Kafka+本地事物表實現(xiàn)的基本流程(圖自己畫的,可能不太清楚)代碼后文貼出,(上圖箭頭只代表流程,和下文的1.2.3無關(guān))此處講一下自己的思路。先申明,kafka只能保證最終一致性,并不是強一致性。我們最終目的是保證上圖2個藍色方塊的任務(wù)執(zhí)行。方便說明,假定2個系統(tǒng)A,B
分別對應(yīng)的2個數(shù)據(jù)庫A庫和B庫。其中A庫中的事務(wù)表叫做A事務(wù)表,B庫中的事務(wù)表叫做B事務(wù)表。要執(zhí)行的藍色方塊叫A業(yè)務(wù)和B業(yè)務(wù)。
1. 在A系統(tǒng)中,啟用A庫的事物,執(zhí)行如下2步操作。
1)A系統(tǒng)執(zhí)行A業(yè)務(wù)
2)A系統(tǒng)在A庫的A事物表中寫一條狀態(tài)為NEW的數(shù)據(jù)(此處數(shù)據(jù)的ID唯一)
此處啟用A庫的事務(wù),即2步操作要木全部執(zhí)行,要木不執(zhí)行。
2.
A系統(tǒng)中啟用一個定時任務(wù),5s中執(zhí)行一次,輪訓(xùn)A庫的A事物表,看是否有狀態(tài)為NEW的數(shù)據(jù),如果有,將此記錄發(fā)送到Kafka消息隊列中,并修改此條數(shù)據(jù)的狀態(tài)為Published。此時A系統(tǒng)的操作全部執(zhí)行完畢。
3.
B系統(tǒng)啟用進程拉取kafka數(shù)據(jù),如果發(fā)現(xiàn)有從A系統(tǒng)來的數(shù)據(jù),將此數(shù)據(jù)記錄到B系統(tǒng)的B事務(wù)表中,更新此數(shù)據(jù)在B系統(tǒng)的B事務(wù)表狀態(tài)為NEW(因為ID唯一,此條數(shù)據(jù)的ID和存放在A庫中的數(shù)據(jù)的ID相同,如果出現(xiàn)網(wǎng)絡(luò)異常導(dǎo)致B系統(tǒng)重復(fù)收到數(shù)據(jù),但看到自己庫中已有此ID的數(shù)據(jù),便會將重復(fù)消息棄用,此處是保證只執(zhí)行一次),更新完成后,Kafka確認提交(此處要關(guān)閉Kafka的自動提交)
4. B系統(tǒng)啟用定時任務(wù),5s執(zhí)行一次,輪訓(xùn)B庫的B事物表,看是否有狀態(tài)為NEW的數(shù)據(jù),如果有,執(zhí)行如下2步操作。
1)B系統(tǒng)執(zhí)行B業(yè)務(wù)
2)B系統(tǒng)更新B庫的B事物表,將此條狀態(tài)為New的數(shù)據(jù)改為狀態(tài)為Published
此處啟用B庫的事務(wù),即2步操作要木全部執(zhí)行,要木不執(zhí)行。
3. 實現(xiàn)代碼
相對于Kafka來說,A系統(tǒng)相當(dāng)于消息生產(chǎn)者,B系統(tǒng)相當(dāng)于消息消費者。下面為SQL建表語句。
-- A系統(tǒng)事務(wù)表 CREATE TABLE `kafka_event_publish` ( `id` bigint(20) unsigned NOT
NULL AUTO_INCREMENT, `payload` varchar(2000) NOT NULL, `eventType` varchar(30)
NOT NULL, `status` varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB
AUTO_INCREMENT=5 DEFAULT CHARSET=utf8; -- B系統(tǒng)事務(wù)表 CREATE TABLE
`kafka_event_process` ( `id`bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`payload`varchar(2000) NOT NULL, `eventType` varchar(30) NOT NULL, `status`
varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6
DEFAULT CHARSET=utf8;
?
Kafka用來發(fā)送消息,接收消息,下面為Kafka的配置類。
package com.boot.util; // 消費者消息狀態(tài) public enum EventProcessStatus { NEW,
PROCESSED;private EventProcessStatus() { } }
--------------------------------------package com.boot.util; // 生產(chǎn)者消息狀態(tài) public
enum EventPublishStatus { NEW, PUBLISHED; private EventPublishStatus() { } }
---------------------------------------package com.boot.util; // Kafka主題 public
enum EventType { USER_CREATED; private EventType() { } } package com.boot.util;
import java.util.Arrays; import java.util.Iterator; import java.util.Properties;
import java.util.concurrent.ExecutionException; import
java.util.function.Consumer;import
org.apache.kafka.clients.consumer.CommitFailedException;import
org.apache.kafka.clients.consumer.ConsumerRecord;import
org.apache.kafka.clients.consumer.ConsumerRecords;import
org.apache.kafka.clients.consumer.KafkaConsumer;import
org.apache.kafka.clients.producer.KafkaProducer;import
org.apache.kafka.clients.producer.Producer;import
org.apache.kafka.clients.producer.ProducerRecord;// kafka工具類 public class
KafkaUtil {private static Producer<String, String> producer; private static
KafkaConsumer<String, String> consumer; public KafkaUtil() { } //
Kafka發(fā)送消息,topic為主題,value為具體消息 public static void sendSync(String topic, String
value)throws ExecutionException, InterruptedException { producer.send(new
ProducerRecord(topic, value)).get(); }// Kafka接收消息 public static void
consume(Consumer<String> c) { // 訂閱主題為USER_CREATED的消息
consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name()));while(true) {
ConsumerRecords<String, String> records = consumer.poll(100L); Iterator var2 =
records.iterator();while(var2.hasNext()) { ConsumerRecord<String, String>
record = (ConsumerRecord)var2.next(); System.out.println(record);
c.accept(record.value()); }try { consumer.commitSync(); } catch
(CommitFailedException var4) { System.out.println("Kafka消費者提交offset失敗"); } } }
// kafka基礎(chǔ)配置 static { Properties producerProps = new Properties();
producerProps.put("bootstrap.servers",
"10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); producerProps.put(
"key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer"); producer = new
KafkaProducer(producerProps); Properties consumerProps= new Properties();
consumerProps.put("bootstrap.servers",
"10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); consumerProps.put(
"key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"
); consumerProps.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put(
"group.id", "VoucherGroup"); consumerProps.put("enable.auto.commit", "false");
consumer= new KafkaConsumer(consumerProps); } }
A系統(tǒng)主要執(zhí)行的操作有
1)執(zhí)行業(yè)務(wù)操作,2)插入New消息到數(shù)據(jù)庫,3)定時任務(wù)輪訓(xùn)數(shù)據(jù)庫為New的數(shù)據(jù),4)發(fā)送到Kafka中,5)修改數(shù)據(jù)庫消息狀態(tài)為Published。此處1),2)步操作不貼代碼。下面為A系統(tǒng)中(即生產(chǎn)者)代碼。
import com.boot.kafka.transaction.EventPublishService; import
org.springframework.scheduling.annotation.Scheduled;import
org.springframework.stereotype.Component;import javax.annotation.Resource; /**
* @Author xiabing5 * @Create 2019/8/2 10:13 * @Desc spring定時器,定時向kafka中發(fā)送事物消息 *
*/ @Component public class EventPublishSchedule { @Resource private
EventPublishService eventPublishService;/* * 每N毫秒執(zhí)行一次*/ @Scheduled(fixedRate =
5000) private void publish() { eventPublishService.publish(); } } import
com.boot.mapper.KafkaEventPublishMapper;import com.boot.pojo.KafkaEventPublish;
import com.boot.util.EventPublishStatus; import com.boot.util.KafkaUtil; import
org.springframework.stereotype.Service;import
org.springframework.transaction.annotation.Transactional;import
org.springframework.util.CollectionUtils;import javax.annotation.Resource;
import java.util.*; /** * @Author xiabing5 * @Create 2019/8/2 9:34 * @Desc
kafka解決分布式事物(消息發(fā)送端) **/ @Service public class EventPublishService { @Resource
private KafkaEventPublishMapper eventPublishMapper; // 事務(wù)表的Mapper
@Transactional(rollbackFor= Exception.class) public void publish() { //
查詢所有狀態(tài)為NEW的事件 Map<String,Object> params = new HashMap<String,Object>();
params.put("status", EventPublishStatus.NEW.name()); List<KafkaEventPublish>
eventPublishList = eventPublishMapper.selectEventPublish(params); if(!
CollectionUtils.isEmpty(eventPublishList)) {// 發(fā)送消息隊列 List<Long> ids =
sendEventPublish(eventPublishList);if (!CollectionUtils.isEmpty(ids)) { //
更新數(shù)據(jù)庫狀態(tài)為PUBLISHED eventPublishMapper.updateEventStatus(ids,
EventPublishStatus.PUBLISHED.name()); } } }/** * @Author xiabing5 * @Create
2019/8/2 10:32 * @Desc 發(fā)送EventPublish對象集合 返回發(fā)送成功的EventPublish的ID集合 **/ private
static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes)
{if(CollectionUtils.isEmpty(kafkaEventPublishes)) { return
Collections.emptyList(); } List<Long> ids = new ArrayList<Long>(); for
(KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) {try {
KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload());
ids.add(kafkaEventPublish.getId()); System.out.println("發(fā)送kafka消息成功"); } catch
(Exception e) { System.out.println("發(fā)送kafka消息失敗 "+ kafkaEventPublish); } }
return ids; } }
B系統(tǒng)主要執(zhí)行的操作有,1)從kafka中拉取數(shù)據(jù) ,2)將此數(shù)據(jù)放入數(shù)據(jù)庫事務(wù)表,更新狀態(tài)為New ,3)
定時任務(wù)輪詢狀態(tài)為New的數(shù)據(jù),執(zhí)行相應(yīng)業(yè)務(wù)操作,4)更新New數(shù)據(jù)狀態(tài)為Complete 。下面為B系統(tǒng)中(即消費者)代碼。
import com.boot.kafka.transaction.EventProcessService; import
org.springframework.scheduling.annotation.Scheduled;import
org.springframework.stereotype.Component;import javax.annotation.Resource; //
消費者定時任務(wù) @Component public class EventProcessSchedule { @Resource private
EventProcessService eventProcessService; @Scheduled(fixedRate= 5000) private
void process() { eventProcessService.process(); } } import
com.boot.mapper.KafkaEventProcessMapper;import com.boot.pojo.KafkaEventProcess;
import com.boot.util.EventProcessStatus; import com.boot.util.EventType; import
com.boot.util.KafkaUtil;import
com.google.common.util.concurrent.ThreadFactoryBuilder;import
org.springframework.stereotype.Service;import
org.springframework.transaction.annotation.Transactional;import
javax.annotation.PostConstruct;import javax.annotation.Resource; import
java.util.HashMap;import java.util.List; import java.util.Map; import
java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors;
/** * @Author xiabing5 * @Create 2019/8/2 13:37 * @Desc 接收kafka消息service類 **/
@Servicepublic class EventProcessService { @Resource private
KafkaEventProcessMapper kafkaEventProcessMapper;// 創(chuàng)建單一線程線程池 @PostConstruct
public void init() { ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("MqMessageConsumerThread-%d") .setDaemon(true) .build();
ExecutorService executorService=
Executors.newSingleThreadExecutor(threadFactory); executorService.execute(new
MqMessageConsumerThread()); }// 自定義接收線程 private class MqMessageConsumerThread
implements Runnable { @Override public void run() {
KafkaUtil.consume(consumerRecord-> { KafkaEventProcess kafkaEventProcess = new
KafkaEventProcess(); kafkaEventProcess .setPayload(consumerRecord);
kafkaEventProcess .setEventType(EventType.USER_CREATED); kafkaEventProcess
.setStatus(EventProcessStatus.NEW);
kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess); }); } }//
執(zhí)行業(yè)務(wù)邏輯操作 @Transactional(rollbackFor = Exception.class) public void process() {
// 查詢表中狀態(tài)為new的事件 Map<String,Object> params = new HashMap<String,Object>();
params.put("status",EventProcessStatus.NEW.name()); List<KafkaEventProcess>
kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params); for
(KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) {// 執(zhí)行業(yè)務(wù)操作
System.out.println("刪除你"); } List<Long> ids =
kafkaEventProcessList.stream().map(item ->
item.getId()).collect(Collectors.toList());
kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name());
} }
補充:此處沒有貼事務(wù)表的sql語句(即Mapper.xml)無非是添加數(shù)據(jù)庫記錄,更新記錄狀態(tài)語句。此代碼在我的實踐中能運行。
4. 總結(jié)
分布式問題一直是我最近比較棘手問題,如分布式鎖,定時任務(wù)在集群下重復(fù)執(zhí)行等。自己也是個小白,希望通過每次實踐后,能總結(jié)出點東西,便于以后去遍歷。
?
熱門工具 換一換