<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      這幾天很忙,但是我現(xiàn)在給我的要求是一周至少要出一篇文章,所以先拿這篇筆記來做開胃菜,源碼分析估計明后兩天應(yīng)該能寫一篇。給自己加油~,即使沒什么人看。

      可靠性

      如何保證消息不丟失

      Kafka只對“已提交”的消息(committed message)做有限度的持久化保證。

      已提交的消息
      當Kafka的若干個Broker成功地接收到一條消息并寫入到日志文件后,它們會告訴生產(chǎn)者程序這條消息已成功提交。

      有限度的持久化保證
      假如一條消息保存在N個Kafka Broker上,那么至少這N個Broker至少有一個存活,才能保證消息不丟失。

      丟失數(shù)據(jù)案例

      生產(chǎn)者程序丟失數(shù)據(jù)

      由于Kafka Producer是異步發(fā)送的,調(diào)用完producer.send(msg)并不能認為消息已經(jīng)發(fā)送成功。


      所以,在Producer永遠要使用帶有回調(diào)通知的發(fā)送API,使用producer.send(msg,callback)。一旦出現(xiàn)消息提交失敗的情況,可以由針對性地進行處理。

      消費者端丟失數(shù)據(jù)

      消費者是先更新offset,再消費消息。如果這個時候消費者突然宕機了,那么這條消息就會丟失。

      所以我們要先消費消息,再更新offset位置。但是這樣會導(dǎo)致消息重復(fù)消費。


      還有一種情況就是consumer獲取到消息后開啟了多個線程異步處理消息,而consumer自動地向前更新offset。假如其中某個線程運行失敗了,那么消息就丟失了。

      遇到這樣的情況,consumer不要開啟自動提交位移,而是要應(yīng)用程序手動提交位移。

      最佳實現(xiàn)

      * 使用producer.send(msg,callback)。
      * 設(shè)置acks = all。acks是Producer的參數(shù),代表了所有副本Broker都要接收到消息,該消息才算是“已提交”。
      * 設(shè)置retries為一個較大的值。是Producer的參數(shù),對應(yīng)Producer自動重試。如果出現(xiàn)網(wǎng)絡(luò)抖動,那么可以自動重試消息發(fā)送,避免消息丟失。
      * unclean.leader.election.enable =
      false。控制有哪些Broker有資格競選分區(qū)的Leader。表示不允許落后太多的Broker競選Leader。
      * 設(shè)置replication.factor>=3。Broker參數(shù),冗余Broker。
      * 設(shè)置min.insync.replicas>1。Broker參數(shù)??刂葡⒅辽僖粚懭氲蕉嗌賯€副本才算是“已提交”。
      *
      確保replication.factor>min.insync.replicas。如果兩個相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。推薦設(shè)置成replication.factor=min.insync.replicas+1.
      * 確保消息消費完成在提交。Consumer端參數(shù)enbale.auto.commit,設(shè)置成false,手動提交位移。
      解釋第二條和第六條:

      如果ISR中只有1個副本了,acks=all也就相當于acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能只滿足于ISR全部寫入,還要保證ISR中的寫入個數(shù)不少于min.insync.replicas。

      冪等性


      在0.11.0.0版本引入了創(chuàng)建冪等性Producer的功能。僅需要設(shè)置props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。


      enable.idempotence設(shè)置成true后,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多保存一些字段。當Producer發(fā)送了相同字段值的消息后,Broker能夠自動知曉這些消息已經(jīng)重復(fù)了。

      作用范圍:

      * 只能保證單分區(qū)上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息。
      * 只能實現(xiàn)單回話上的冪等性,這里的會話指的是Producer進程的一次運行。當重啟了Producer進程之后,冪等性不保證。
      事務(wù)

      Kafka在0.11版本開始提供對事務(wù)的支持,提供是read
      committed隔離級別的事務(wù)。保證多條消息原子性地寫入到目標分區(qū),同時也能保證Consumer只能看到事務(wù)成功提交的消息。

      事務(wù)性Producer

      保證多條消息原子性地寫入到多個分區(qū)中。這批消息要么全部成功,要不全部失敗。事務(wù)性Producer也不懼進程重啟。

      Producer端的設(shè)置:

      * 開啟enable.idempotence = true
      * 設(shè)置Producer端參數(shù) transactional.id

      除此之外,還要加上調(diào)用事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應(yīng)對事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
      如下:
      producer.initTransactions(); try { producer.beginTransaction();
      producer.send(record1); producer.send(record2); producer.commitTransaction(); }
      catch (KafkaException e) { producer.abortTransaction(); }
      這段代碼能保證record1和record2被當做一個事務(wù)同一提交到Kafka,要么全部成功,要么全部寫入失敗。

      Consumer端的設(shè)置:
      設(shè)置isolation.level參數(shù),目前有兩個取值:

      * read_uncommitted:默認值表明Consumer端無論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。
      *
      read_committed:表明Consumer只會讀取事務(wù)型Producer成功提交事務(wù)寫入的消息。注意,非事務(wù)型Producer寫入的所有消息都能看到。

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點信息
      問題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          波多野结衣三级在线播放 | 香蕉视频在线观看www | 亚洲剧情在线观看 | 色五月天在线视频 | 豆花视频一区二区三区在线观看 | 亚洲淫乱网 | 青青草在线视频免费公开视频 | 放荡勾人引诱h | 伊人久色 | 黄片靠逼|