1. 引言

           
           事務(wù)大家都知道,就是相當(dāng)于一個原子操作,要么全部執(zhí)行,要么發(fā)生異常全部回滾。但事務(wù)只限于本地事務(wù),即各個數(shù)據(jù)庫操作必須在同一數(shù)據(jù)庫下執(zhí)行。拿我最近的接手的項目來說,各個模塊全部部署于不同的服務(wù)器,都有自己獨立的數(shù)據(jù)庫。前端想要刪除一個用戶,先調(diào)用用戶平臺的刪除用戶接口,再調(diào)用權(quán)限平臺的刪除權(quán)限接口。起初覺得這樣操作沒什么問題,后來有幾次數(shù)據(jù)異常后,發(fā)現(xiàn)有的用戶信息沒有,但權(quán)限信息還存在,導(dǎo)致數(shù)據(jù)不一致。此時,就想到了用分布式事物來解決。所謂分布式事物,我個人理解是為了解決數(shù)據(jù)一致性的問題。

          2. kafka+本地事物表解決分布式事務(wù)

            
          消息隊列的產(chǎn)生是為了解決各系統(tǒng)間通信問題,因為Kafka用的比較多,此處就想到用Kafka+本地事物表去解決分布式事務(wù)問題。關(guān)于Kafka+zookeeper的搭建此處不做詳解。

          ?


            上圖是自己基于Kafka+本地事物表實現(xiàn)的基本流程(圖自己畫的,可能不太清楚)代碼后文貼出,(上圖箭頭只代表流程,和下文的1.2.3無關(guān))此處講一下自己的思路。先申明,kafka只能保證最終一致性,并不是強一致性。我們最終目的是保證上圖2個藍色方塊的任務(wù)執(zhí)行。方便說明,假定2個系統(tǒng)A,B
          分別對應(yīng)的2個數(shù)據(jù)庫A庫和B庫。其中A庫中的事務(wù)表叫做A事務(wù)表,B庫中的事務(wù)表叫做B事務(wù)表。要執(zhí)行的藍色方塊叫A業(yè)務(wù)和B業(yè)務(wù)。

            1. 在A系統(tǒng)中,啟用A庫的事物,執(zhí)行如下2步操作。

              1)A系統(tǒng)執(zhí)行A業(yè)務(wù)

              2)A系統(tǒng)在A庫的A事物表中寫一條狀態(tài)為NEW的數(shù)據(jù)(此處數(shù)據(jù)的ID唯一)

              此處啟用A庫的事務(wù),即2步操作要木全部執(zhí)行,要木不執(zhí)行。

            2. 
          A系統(tǒng)中啟用一個定時任務(wù),5s中執(zhí)行一次,輪訓(xùn)A庫的A事物表,看是否有狀態(tài)為NEW的數(shù)據(jù),如果有,將此記錄發(fā)送到Kafka消息隊列中,并修改此條數(shù)據(jù)的狀態(tài)為Published。此時A系統(tǒng)的操作全部執(zhí)行完畢。

            3.
           B系統(tǒng)啟用進程拉取kafka數(shù)據(jù),如果發(fā)現(xiàn)有從A系統(tǒng)來的數(shù)據(jù),將此數(shù)據(jù)記錄到B系統(tǒng)的B事務(wù)表中,更新此數(shù)據(jù)在B系統(tǒng)的B事務(wù)表狀態(tài)為NEW(因為ID唯一,此條數(shù)據(jù)的ID和存放在A庫中的數(shù)據(jù)的ID相同,如果出現(xiàn)網(wǎng)絡(luò)異常導(dǎo)致B系統(tǒng)重復(fù)收到數(shù)據(jù),但看到自己庫中已有此ID的數(shù)據(jù),便會將重復(fù)消息棄用,此處是保證只執(zhí)行一次),更新完成后,Kafka確認提交(此處要關(guān)閉Kafka的自動提交)

            4. B系統(tǒng)啟用定時任務(wù),5s執(zhí)行一次,輪訓(xùn)B庫的B事物表,看是否有狀態(tài)為NEW的數(shù)據(jù),如果有,執(zhí)行如下2步操作。

              1)B系統(tǒng)執(zhí)行B業(yè)務(wù)

              2)B系統(tǒng)更新B庫的B事物表,將此條狀態(tài)為New的數(shù)據(jù)改為狀態(tài)為Published


              此處啟用B庫的事務(wù),即2步操作要木全部執(zhí)行,要木不執(zhí)行。

          3. 實現(xiàn)代碼 

            相對于Kafka來說,A系統(tǒng)相當(dāng)于消息生產(chǎn)者,B系統(tǒng)相當(dāng)于消息消費者。下面為SQL建表語句。
          -- A系統(tǒng)事務(wù)表 CREATE TABLE `kafka_event_publish` ( `id` bigint(20) unsigned NOT
          NULL AUTO_INCREMENT, `payload` varchar(2000) NOT NULL, `eventType` varchar(30)
          NOT NULL, `status` varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB
          AUTO_INCREMENT=5 DEFAULT CHARSET=utf8; -- B系統(tǒng)事務(wù)表 CREATE TABLE
          `kafka_event_process` ( `id`bigint(20) unsigned NOT NULL AUTO_INCREMENT,
          `payload`varchar(2000) NOT NULL, `eventType` varchar(30) NOT NULL, `status`
          varchar(30) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6
          DEFAULT CHARSET=utf8;
          ?

            Kafka用來發(fā)送消息,接收消息,下面為Kafka的配置類。
          package com.boot.util; // 消費者消息狀態(tài) public enum EventProcessStatus { NEW,
          PROCESSED;private EventProcessStatus() { } }
          --------------------------------------package com.boot.util; // 生產(chǎn)者消息狀態(tài) public
          enum EventPublishStatus { NEW, PUBLISHED; private EventPublishStatus() { } }
          ---------------------------------------package com.boot.util; // Kafka主題 public
          enum EventType { USER_CREATED; private EventType() { } } package com.boot.util;
          import java.util.Arrays; import java.util.Iterator; import java.util.Properties;
          import java.util.concurrent.ExecutionException; import
          java.util.function.Consumer;import
          org.apache.kafka.clients.consumer.CommitFailedException;import
          org.apache.kafka.clients.consumer.ConsumerRecord;import
          org.apache.kafka.clients.consumer.ConsumerRecords;import
          org.apache.kafka.clients.consumer.KafkaConsumer;import
          org.apache.kafka.clients.producer.KafkaProducer;import
          org.apache.kafka.clients.producer.Producer;import
          org.apache.kafka.clients.producer.ProducerRecord;// kafka工具類 public class
          KafkaUtil {private static Producer<String, String> producer; private static
          KafkaConsumer<String, String> consumer; public KafkaUtil() { } //
          Kafka發(fā)送消息,topic為主題,value為具體消息 public static void sendSync(String topic, String
          value)throws ExecutionException, InterruptedException { producer.send(new
          ProducerRecord(topic, value)).get(); }// Kafka接收消息 public static void
          consume(Consumer<String> c) { // 訂閱主題為USER_CREATED的消息
          consumer.subscribe(Arrays.asList(EventType.USER_CREATED.name()));while(true) {
          ConsumerRecords<String, String> records = consumer.poll(100L); Iterator var2 =
          records.iterator();while(var2.hasNext()) { ConsumerRecord<String, String>
          record = (ConsumerRecord)var2.next(); System.out.println(record);
          c.accept(record.value()); }try { consumer.commitSync(); } catch
          (CommitFailedException var4) { System.out.println("Kafka消費者提交offset失敗"); } } }
          // kafka基礎(chǔ)配置 static { Properties producerProps = new Properties();
          producerProps.put("bootstrap.servers",
          "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); producerProps.put(
          "key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
          producerProps.put("value.serializer",
          "org.apache.kafka.common.serialization.StringSerializer"); producer = new
          KafkaProducer(producerProps); Properties consumerProps= new Properties();
          consumerProps.put("bootstrap.servers",
          "10.113.56.68:9093,10.113.56.68:9094,10.113.56.68:9092"); consumerProps.put(
          "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"
          ); consumerProps.put("value.deserializer",
          "org.apache.kafka.common.serialization.StringDeserializer"); consumerProps.put(
          "group.id", "VoucherGroup"); consumerProps.put("enable.auto.commit", "false");
          consumer= new KafkaConsumer(consumerProps); } }
            A系統(tǒng)主要執(zhí)行的操作有
          1)執(zhí)行業(yè)務(wù)操作,2)插入New消息到數(shù)據(jù)庫,3)定時任務(wù)輪訓(xùn)數(shù)據(jù)庫為New的數(shù)據(jù),4)發(fā)送到Kafka中,5)修改數(shù)據(jù)庫消息狀態(tài)為Published。此處1),2)步操作不貼代碼。下面為A系統(tǒng)中(即生產(chǎn)者)代碼。
          import com.boot.kafka.transaction.EventPublishService; import
          org.springframework.scheduling.annotation.Scheduled;import
          org.springframework.stereotype.Component;import javax.annotation.Resource; /**
          * @Author xiabing5 * @Create 2019/8/2 10:13 * @Desc spring定時器,定時向kafka中發(fā)送事物消息 *
          */ @Component public class EventPublishSchedule { @Resource private
          EventPublishService eventPublishService;/* * 每N毫秒執(zhí)行一次*/ @Scheduled(fixedRate =
          5000) private void publish() { eventPublishService.publish(); } } import
          com.boot.mapper.KafkaEventPublishMapper;import com.boot.pojo.KafkaEventPublish;
          import com.boot.util.EventPublishStatus; import com.boot.util.KafkaUtil; import
          org.springframework.stereotype.Service;import
          org.springframework.transaction.annotation.Transactional;import
          org.springframework.util.CollectionUtils;import javax.annotation.Resource;
          import java.util.*; /** * @Author xiabing5 * @Create 2019/8/2 9:34 * @Desc
          kafka解決分布式事物(消息發(fā)送端) **/ @Service public class EventPublishService { @Resource
          private KafkaEventPublishMapper eventPublishMapper; // 事務(wù)表的Mapper
          @Transactional(rollbackFor= Exception.class) public void publish() { //
          查詢所有狀態(tài)為NEW的事件 Map<String,Object> params = new HashMap<String,Object>();
          params.put("status", EventPublishStatus.NEW.name()); List<KafkaEventPublish>
          eventPublishList = eventPublishMapper.selectEventPublish(params); if(!
          CollectionUtils.isEmpty(eventPublishList)) {// 發(fā)送消息隊列 List<Long> ids =
          sendEventPublish(eventPublishList);if (!CollectionUtils.isEmpty(ids)) { //
          更新數(shù)據(jù)庫狀態(tài)為PUBLISHED eventPublishMapper.updateEventStatus(ids,
          EventPublishStatus.PUBLISHED.name()); } } }/** * @Author xiabing5 * @Create
          2019/8/2 10:32 * @Desc 發(fā)送EventPublish對象集合 返回發(fā)送成功的EventPublish的ID集合 **/ private
          static List<Long> sendEventPublish(List<KafkaEventPublish> kafkaEventPublishes)
          {if(CollectionUtils.isEmpty(kafkaEventPublishes)) { return
          Collections.emptyList(); } List<Long> ids = new ArrayList<Long>(); for
          (KafkaEventPublish kafkaEventPublish : kafkaEventPublishes) {try {
          KafkaUtil.sendSync(kafkaEventPublish.getEventType().name(),kafkaEventPublish.getPayload());
          ids.add(kafkaEventPublish.getId()); System.out.println("發(fā)送kafka消息成功"); } catch
          (Exception e) { System.out.println("發(fā)送kafka消息失敗 "+ kafkaEventPublish); } }
          return ids; } }
            B系統(tǒng)主要執(zhí)行的操作有,1)從kafka中拉取數(shù)據(jù) ,2)將此數(shù)據(jù)放入數(shù)據(jù)庫事務(wù)表,更新狀態(tài)為New ,3)
          定時任務(wù)輪詢狀態(tài)為New的數(shù)據(jù),執(zhí)行相應(yīng)業(yè)務(wù)操作,4)更新New數(shù)據(jù)狀態(tài)為Complete 。下面為B系統(tǒng)中(即消費者)代碼。
          import com.boot.kafka.transaction.EventProcessService; import
          org.springframework.scheduling.annotation.Scheduled;import
          org.springframework.stereotype.Component;import javax.annotation.Resource; //
          消費者定時任務(wù) @Component public class EventProcessSchedule { @Resource private
          EventProcessService eventProcessService; @Scheduled(fixedRate= 5000) private
          void process() { eventProcessService.process(); } } import
          com.boot.mapper.KafkaEventProcessMapper;import com.boot.pojo.KafkaEventProcess;
          import com.boot.util.EventProcessStatus; import com.boot.util.EventType; import
          com.boot.util.KafkaUtil;import
          com.google.common.util.concurrent.ThreadFactoryBuilder;import
          org.springframework.stereotype.Service;import
          org.springframework.transaction.annotation.Transactional;import
          javax.annotation.PostConstruct;import javax.annotation.Resource; import
          java.util.HashMap;import java.util.List; import java.util.Map; import
          java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;
          import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors;
          /** * @Author xiabing5 * @Create 2019/8/2 13:37 * @Desc 接收kafka消息service類 **/
          @Servicepublic class EventProcessService { @Resource private
          KafkaEventProcessMapper kafkaEventProcessMapper;// 創(chuàng)建單一線程線程池 @PostConstruct
          public void init() { ThreadFactory threadFactory = new ThreadFactoryBuilder()
          .setNameFormat("MqMessageConsumerThread-%d") .setDaemon(true) .build();
          ExecutorService executorService=
          Executors.newSingleThreadExecutor(threadFactory); executorService.execute(new
          MqMessageConsumerThread()); }// 自定義接收線程 private class MqMessageConsumerThread
          implements Runnable { @Override public void run() {
          KafkaUtil.consume(consumerRecord-> { KafkaEventProcess kafkaEventProcess = new
          KafkaEventProcess(); kafkaEventProcess .setPayload(consumerRecord);
          kafkaEventProcess .setEventType(EventType.USER_CREATED); kafkaEventProcess
          .setStatus(EventProcessStatus.NEW);
          kafkaEventProcessMapper.insertEventProcess(kafkaEventProcess); }); } }//
          執(zhí)行業(yè)務(wù)邏輯操作 @Transactional(rollbackFor = Exception.class) public void process() {
          // 查詢表中狀態(tài)為new的事件 Map<String,Object> params = new HashMap<String,Object>();
          params.put("status",EventProcessStatus.NEW.name()); List<KafkaEventProcess>
          kafkaEventProcessList = kafkaEventProcessMapper.selectEventProcess(params); for
          (KafkaEventProcess kafkaEventProcess : kafkaEventProcessList) {// 執(zhí)行業(yè)務(wù)操作
          System.out.println("刪除你"); } List<Long> ids =
          kafkaEventProcessList.stream().map(item ->
          item.getId()).collect(Collectors.toList());
          kafkaEventProcessMapper.updateEventStatus(ids,EventProcessStatus.PROCESSED.name());
          } }
            補充:此處沒有貼事務(wù)表的sql語句(即Mapper.xml)無非是添加數(shù)據(jù)庫記錄,更新記錄狀態(tài)語句。此代碼在我的實踐中能運行。

          4. 總結(jié)

            分布式問題一直是我最近比較棘手問題,如分布式鎖,定時任務(wù)在集群下重復(fù)執(zhí)行等。自己也是個小白,希望通過每次實踐后,能總結(jié)出點東西,便于以后去遍歷。

            

          ?

          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                天天天操| 成人免费视频在线观看爱V8 | 激情六月丁香 | 亚洲日韩精品秘 在线观看 | 亚洲电影一区二区三区 |