這幾天很忙,但是我現(xiàn)在給我的要求是一周至少要出一篇文章,所以先拿這篇筆記來做開胃菜,源碼分析估計明后兩天應(yīng)該能寫一篇。給自己加油~,即使沒什么人看。
可靠性
如何保證消息不丟失
Kafka只對“已提交”的消息(committed message)做有限度的持久化保證。
已提交的消息
當Kafka的若干個Broker成功地接收到一條消息并寫入到日志文件后,它們會告訴生產(chǎn)者程序這條消息已成功提交。
有限度的持久化保證
假如一條消息保存在N個Kafka Broker上,那么至少這N個Broker至少有一個存活,才能保證消息不丟失。
丟失數(shù)據(jù)案例
生產(chǎn)者程序丟失數(shù)據(jù)
由于Kafka Producer是異步發(fā)送的,調(diào)用完producer.send(msg)并不能認為消息已經(jīng)發(fā)送成功。
所以,在Producer永遠要使用帶有回調(diào)通知的發(fā)送API,使用producer.send(msg,callback)。一旦出現(xiàn)消息提交失敗的情況,可以由針對性地進行處理。
消費者端丟失數(shù)據(jù)
消費者是先更新offset,再消費消息。如果這個時候消費者突然宕機了,那么這條消息就會丟失。
所以我們要先消費消息,再更新offset位置。但是這樣會導(dǎo)致消息重復(fù)消費。
還有一種情況就是consumer獲取到消息后開啟了多個線程異步處理消息,而consumer自動地向前更新offset。假如其中某個線程運行失敗了,那么消息就丟失了。
遇到這樣的情況,consumer不要開啟自動提交位移,而是要應(yīng)用程序手動提交位移。
最佳實現(xiàn)
* 使用producer.send(msg,callback)。
* 設(shè)置acks = all。acks是Producer的參數(shù),代表了所有副本Broker都要接收到消息,該消息才算是“已提交”。
* 設(shè)置retries為一個較大的值。是Producer的參數(shù),對應(yīng)Producer自動重試。如果出現(xiàn)網(wǎng)絡(luò)抖動,那么可以自動重試消息發(fā)送,避免消息丟失。
* unclean.leader.election.enable =
false。控制有哪些Broker有資格競選分區(qū)的Leader。表示不允許落后太多的Broker競選Leader。
* 設(shè)置replication.factor>=3。Broker參數(shù),冗余Broker。
* 設(shè)置min.insync.replicas>1。Broker參數(shù)??刂葡⒅辽僖粚懭氲蕉嗌賯€副本才算是“已提交”。
*
確保replication.factor>min.insync.replicas。如果兩個相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。推薦設(shè)置成replication.factor=min.insync.replicas+1.
* 確保消息消費完成在提交。Consumer端參數(shù)enbale.auto.commit,設(shè)置成false,手動提交位移。
解釋第二條和第六條:
如果ISR中只有1個副本了,acks=all也就相當于acks=1了,引入min.insync.replicas的目的就是為了做一個下限的限制:不能只滿足于ISR全部寫入,還要保證ISR中的寫入個數(shù)不少于min.insync.replicas。
冪等性
在0.11.0.0版本引入了創(chuàng)建冪等性Producer的功能。僅需要設(shè)置props.put(“enable.idempotence”,true),或props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)。
enable.idempotence設(shè)置成true后,Producer自動升級成冪等性Producer。Kafka會自動去重。Broker會多保存一些字段。當Producer發(fā)送了相同字段值的消息后,Broker能夠自動知曉這些消息已經(jīng)重復(fù)了。
作用范圍:
* 只能保證單分區(qū)上的冪等性,即一個冪等性Producer能夠保證某個主題的一個分區(qū)上不出現(xiàn)重復(fù)消息。
* 只能實現(xiàn)單回話上的冪等性,這里的會話指的是Producer進程的一次運行。當重啟了Producer進程之后,冪等性不保證。
事務(wù)
Kafka在0.11版本開始提供對事務(wù)的支持,提供是read
committed隔離級別的事務(wù)。保證多條消息原子性地寫入到目標分區(qū),同時也能保證Consumer只能看到事務(wù)成功提交的消息。
事務(wù)性Producer
保證多條消息原子性地寫入到多個分區(qū)中。這批消息要么全部成功,要不全部失敗。事務(wù)性Producer也不懼進程重啟。
Producer端的設(shè)置:
* 開啟enable.idempotence = true
* 設(shè)置Producer端參數(shù) transactional.id
除此之外,還要加上調(diào)用事務(wù)API,如initTransaction、beginTransaction、commitTransaction和abortTransaction,分別應(yīng)對事務(wù)的初始化、事務(wù)開始、事務(wù)提交以及事務(wù)終止。
如下:
producer.initTransactions(); try { producer.beginTransaction();
producer.send(record1); producer.send(record2); producer.commitTransaction(); }
catch (KafkaException e) { producer.abortTransaction(); }
這段代碼能保證record1和record2被當做一個事務(wù)同一提交到Kafka,要么全部成功,要么全部寫入失敗。
Consumer端的設(shè)置:
設(shè)置isolation.level參數(shù),目前有兩個取值:
* read_uncommitted:默認值表明Consumer端無論事務(wù)型Producer提交事務(wù)還是終止事務(wù),其寫入的消息都可以讀取。
*
read_committed:表明Consumer只會讀取事務(wù)型Producer成功提交事務(wù)寫入的消息。注意,非事務(wù)型Producer寫入的所有消息都能看到。
熱門工具 換一換