一、kafka概述
1.1、定義
Kakfa是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(message queue),主要應(yīng)用于大數(shù)據(jù)的實(shí)時(shí)處理領(lǐng)域
1.2、消息隊(duì)列
1.2.1、傳統(tǒng)的消息隊(duì)列&新式的消息隊(duì)列的模式
?
?
?
上面是傳統(tǒng)的消息隊(duì)列,比如一個(gè)用戶要注冊(cè)信息,當(dāng)用戶信息寫入數(shù)據(jù)庫(kù)后,后面還有一些其他流程,比如發(fā)送短信,則需要等這些流程處理完成后,在返回給用戶
而新式的隊(duì)列是,比如一個(gè)用戶注冊(cè)信息,數(shù)據(jù)直接丟進(jìn)數(shù)據(jù)庫(kù),就直接返回給用戶成功
1.2.2、使用消息隊(duì)列的好處
A、??????? 解耦
B、??????? 可恢復(fù)性
C、??????? 緩沖
D、??????? 靈活性&峰值處理能力
E、???????? 異步通信
?
1.2.3、消息隊(duì)列的模式
A、點(diǎn)對(duì)點(diǎn)模式
消息生產(chǎn)者發(fā)送消息到消息隊(duì)列中,然后消息消費(fèi)者從隊(duì)列中取出并且消費(fèi)消息,消息被消費(fèi)后,隊(duì)列中不在存儲(chǔ)。所以消息消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息;隊(duì)列支持存在多個(gè)消費(fèi)者,但是對(duì)于一個(gè)消息而言,只會(huì)
有一個(gè)消費(fèi)者可以消費(fèi);如果想發(fā)給多個(gè)消費(fèi)者,則需要多次發(fā)送該條消息
B】發(fā)布/訂閱模式(一對(duì)多,消費(fèi)者消費(fèi)數(shù)據(jù)之后不會(huì)清除消息)
消息生產(chǎn)者將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息,和點(diǎn)對(duì)點(diǎn)的方式不同,發(fā)布到topic的消息會(huì)被所有的訂閱者消費(fèi);但是數(shù)據(jù)保留是期限的,默認(rèn)是7天,因?yàn)樗皇谴鎯?chǔ)系統(tǒng);kafka就是這種模式的;有兩種方式,一種是是消費(fèi)者去主動(dòng)去消費(fèi)(拉?。┫ⅲ皇巧a(chǎn)者推送消息給消費(fèi)者;另外一種就是生產(chǎn)者主動(dòng)推送消息給消費(fèi)者,類似公眾號(hào)
?
1.3、kafka的基礎(chǔ)架構(gòu)
?
?
?
kafka的基礎(chǔ)架構(gòu)主要有broker、生產(chǎn)者、消費(fèi)者組構(gòu)成,當(dāng)前還包括zookeeper
生產(chǎn)者負(fù)責(zé)發(fā)送消息
broker負(fù)責(zé)緩沖消息,broker中可以創(chuàng)建topic,每個(gè)topic又有partition和replication的概念
消費(fèi)者組負(fù)責(zé)處理消息,同一個(gè)消費(fèi)者組的中消費(fèi)者不能消費(fèi)同一個(gè)partition中的數(shù)據(jù),消費(fèi)者組主要是提高消費(fèi)能力,比如之前是一個(gè)消費(fèi)者消費(fèi)100條數(shù)據(jù),現(xiàn)在是2個(gè)消費(fèi)者消費(fèi)100條數(shù)據(jù),可以提高消費(fèi)能力;所以消費(fèi)者組的消費(fèi)者的個(gè)數(shù)要小于partition的個(gè)數(shù),不然就會(huì)有消費(fèi)者沒(méi)有partition可以消費(fèi),造成資源的浪費(fèi)
注:但是不同的消費(fèi)者組的消費(fèi)者是可以消費(fèi)相同的partition數(shù)據(jù)
Kakfa如果要組件集群,則只需要注冊(cè)到一個(gè)zk中就可以了,zk中還保留消息消費(fèi)的進(jìn)度或者說(shuō)偏移量或者消費(fèi)位置
0.9版本之前偏移量存儲(chǔ)在zk
0.9版本之后偏移量存儲(chǔ)在kafka中,kafka定義了一個(gè)系統(tǒng)的topic,專用用來(lái)存儲(chǔ)偏移量的數(shù)據(jù);
為什么要改?主要是考慮到頻繁更改偏移量,對(duì)zk的壓力較大,而且kafka本身自己的處理也較復(fù)雜
?
1.4、kafka安裝
A、Kafka的安裝只需要解壓安裝包就可以完成安裝
tar -zxvf kafka_2.11-2.1.1.tgz -C /usr/local/
B、查看配置文件
[root@es1 config]# pwd /usr/local/kafka/config [root@es1 config]# ll total 84
-rw-r--r--. 1 root root 906 Feb 8 2019 connect-console-sink.properties
-rw-r--r--. 1 root root 909 Feb 8 2019 connect-console-source.properties
-rw-r--r--. 1 root root 5321 Feb 8 2019 connect-distributed.properties
-rw-r--r--. 1 root root 883 Feb 8 2019 connect-file-sink.properties -rw-r--r--.
1 root root 881 Feb 8 2019 connect-file-source.properties -rw-r--r--. 1 root
root 1111 Feb 8 2019 connect-log4j.properties -rw-r--r--. 1 root root 2262 Feb
8 2019 connect-standalone.properties -rw-r--r--. 1 root root 1221 Feb 8 2019
consumer.properties -rw-r--r--. 1 root root 4727 Feb 8 2019 log4j.properties
-rw-r--r--. 1 root root 1925 Feb 8 2019 producer.properties -rw-r--r--. 1 root
root 6865 Jan 16 22:00 server-1.properties -rw-r--r--. 1 root root 6865 Jan 16
22:00 server-2.properties -rw-r--r--. 1 root root 6873 Jan 16 03:57
server.properties -rw-r--r--. 1 root root 1032 Feb 8 2019
tools-log4j.properties -rw-r--r--. 1 root root 1169 Feb 8 2019 trogdor.conf
-rw-r--r--. 1 root root 1023 Feb 8 2019 zookeeper.properties
C、修改配置文件server.properties
設(shè)置broker.id 這個(gè)是kafka集群區(qū)分每個(gè)節(jié)點(diǎn)的唯一標(biāo)志符
?
?
?
D、設(shè)置kafka的數(shù)據(jù)存儲(chǔ)路徑
?
?
?
注:這個(gè)目錄下不能有其他非kafka的目錄,不然會(huì)導(dǎo)致kafka集群無(wú)法啟動(dòng)
E、設(shè)置是否可以刪除topic,默認(rèn)情況先kafka的topic是不允許刪除的
?
?
?
?
F、Kafka的數(shù)據(jù)保留的時(shí)間,默認(rèn)是7天
?
?
?
G、Log文件最大的大小,如果log文件超過(guò)1g會(huì)創(chuàng)建一個(gè)新的文件
?
?
?
?
?
?
H、Kafka連接的zk的地址和連接kafka的超時(shí)時(shí)間
?
?
?
J、默認(rèn)的partition的個(gè)數(shù)
?
?
?
1.5、啟動(dòng)kafka
A、啟動(dòng)方式1,kafka只能單節(jié)點(diǎn)啟動(dòng),所以每個(gè)kakfa節(jié)點(diǎn)都需要手動(dòng)啟動(dòng),下面的方式阻塞的方式啟動(dòng)
?
?
?
B、啟動(dòng)方式2,守護(hù)的方式啟動(dòng),推薦
?
?
?
1.6、kafka操作
A、查看當(dāng)前kafka集群已有的topic
?
?
?
注意:這里連接的zookeeper,而不是連接的kafka
B、創(chuàng)建topic,指定分片和副本個(gè)數(shù)
?
?
?
注:
replication-factor:副本數(shù)
replication-factor:分區(qū)數(shù)
Topic:主題名
如果當(dāng)前kafka集群只有3個(gè)broker節(jié)點(diǎn),則replication-factor最大就是3了,下面的例子創(chuàng)建副本為4,則會(huì)報(bào)錯(cuò)
?
?
?
C、刪除topic
?
?
?
D、查看topic信息
?
?
?
?
?
?
?
1.7、啟動(dòng)生產(chǎn)者生產(chǎn)消息,kafka自帶一個(gè)生產(chǎn)者和消費(fèi)者的客戶端
A、啟動(dòng)一個(gè)生產(chǎn)者,注意此時(shí)連的9092端口,連接的kafka集群
?
?
?
B、啟動(dòng)一個(gè)消費(fèi)者,注意此時(shí)連接的還是9092端口,在0.9版本之前連接的還是2181端口
?
?
?
這里我們啟動(dòng)2個(gè)消費(fèi)者來(lái)測(cè)試一下
?
?
?
注:如果不指定的消費(fèi)者組的配置文件的話,默認(rèn)每個(gè)消費(fèi)者都屬于不同的消費(fèi)者組
C、發(fā)送消息,可以看到每個(gè)消費(fèi)者都能收到消息
?
?
?
?
?
?
?
?
?
D、Kakfa中的實(shí)際的數(shù)據(jù)
?
?
?
?
?
?
二、kafka架構(gòu)深入
?
?
?
?
Kafka不能保證消息的全局有序,只能保證消息在partition內(nèi)有序,因?yàn)橄M(fèi)者消費(fèi)消息是在不同的partition中隨機(jī)的
2.1、kafka的工作流程
Kafka中的消息是以topic進(jìn)行分類的,生產(chǎn)者生成消息,消費(fèi)者消費(fèi)消息,都是面向topic的
?
?
?
?
Topic是一個(gè)邏輯上的概念,而partition是物理上的概念
每個(gè)partition又有副本的概念
每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,該log文件中存儲(chǔ)的就是生產(chǎn)者生成的數(shù)據(jù),生產(chǎn)者生成的數(shù)據(jù)會(huì)不斷的追加到該log的文件末端,且每條數(shù)據(jù)都有自己的offset,消費(fèi)者都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了那個(gè)offset,以便出錯(cuò)的時(shí)候從上次的位置繼續(xù)消費(fèi),這個(gè)offset就保存在index文件中
kafka的offset是分區(qū)內(nèi)有序的,但是在不同分區(qū)中是無(wú)順序的,kafka不保證數(shù)據(jù)的全局有序
2.2、kafka原理
由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到log文件的末尾,為防止log文件過(guò)大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采用分片和索引的機(jī)制,將每個(gè)partition分為多個(gè)segment,每個(gè)segment對(duì)應(yīng)2個(gè)文件----index文件和log文件,這2個(gè)文件位于一個(gè)相同的文件夾下,文件夾的命名規(guī)則為topic名稱+分區(qū)序號(hào)
?
?
Indx和log的文件的文件名是當(dāng)前這個(gè)索引是最小的數(shù)據(jù)的offset
Kafka如何快速的消費(fèi)數(shù)據(jù)呢?
?
?
Index文件中存儲(chǔ)的數(shù)據(jù)的索引信息,第一列是offset,第二列這這個(gè)數(shù)據(jù)所對(duì)應(yīng)的log文件中的偏移量,就像我們?nèi)プx文件,使用seek()設(shè)置當(dāng)前鼠標(biāo)的位置一樣,可以更快的找到數(shù)據(jù)
如果要去消費(fèi)offset為3的數(shù)據(jù),首先通過(guò)二分法找到數(shù)據(jù)在哪個(gè)index文件中,然后在通過(guò)index中offset找到數(shù)據(jù)在log文件中的offset;這樣就可以快速的定位到數(shù)據(jù),并消費(fèi)
所以kakfa雖然把數(shù)據(jù)存儲(chǔ)在磁盤中,但是他的讀取速度還是非??斓?br>
?
三、kafka的生產(chǎn)者和消費(fèi)者
3.1、kafka的生產(chǎn)者
Kafka的partition的分區(qū)的作用
Kafka的分區(qū)的原因主要就是提供并發(fā)提高性能,因?yàn)樽x寫是partition為單位讀寫的;
那生產(chǎn)者發(fā)送消息是發(fā)送到哪個(gè)partition中呢?
A、在客戶端中指定partition
B、輪詢(推薦)消息1去p1,消息2去p2,消息3去p3,消息4去p1,消息5去p2,消息6去p3 。。。。。。。
?
3.2 kafka如何保證數(shù)據(jù)可靠性呢?通過(guò)ack來(lái)保證
為保證生產(chǎn)者發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個(gè)partition收到生產(chǎn)者發(fā)送的數(shù)據(jù)后,都需要向生產(chǎn)者發(fā)送ack(確認(rèn)收到),如果生產(chǎn)者收到ack,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)
?
?
?
?
那么kafka什么時(shí)候向生產(chǎn)者發(fā)送ack
確保follower和leader同步完成,leader在發(fā)送ack給生產(chǎn)者,這樣才能確保leader掛掉之后,能再follower中選舉出新的leader后,數(shù)據(jù)不會(huì)丟失
那多少個(gè)follower同步完成后發(fā)送ack
方案1:半數(shù)已經(jīng)完成同步,就發(fā)送ack
方案2:全部完成同步,才發(fā)送ack(kafka采用這種方式)
采用第二種方案后,設(shè)想以下場(chǎng)景,leader收到數(shù)據(jù),所有的follower都開始同步數(shù)據(jù),但是有一個(gè)follower因?yàn)槟撤N故障,一直無(wú)法完成同步,那leader就要一直等下,直到他同步完成,才能發(fā)送ack,這樣就非常影響效率,這個(gè)問(wèn)題怎么解決?
?
?
Leader維護(hù)了一個(gè)動(dòng)態(tài)的ISR列表(同步副本的作用),只需要這個(gè)列表的中的follower和leader同步;當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會(huì)給生產(chǎn)者發(fā)送ack,如果follower長(zhǎng)時(shí)間未向leader同步數(shù)據(jù),則該follower將被剔除ISR,這個(gè)時(shí)間閾值也是自定義的;同樣leader故障后,就會(huì)從ISR中選舉新的leader
怎么選擇ISR的節(jié)點(diǎn)呢?
首先通信的時(shí)間要快,要和leader要可以很快的完成通信,這個(gè)時(shí)間默認(rèn)是10s
然后就看leader數(shù)據(jù)差距,消息條數(shù)默認(rèn)是10000條(后面版本被移除)
為什么移除:因?yàn)閗afka發(fā)送消息是批量發(fā)送的,所以會(huì)一瞬間leader接受完成,但是follower還沒(méi)有拉取,所以會(huì)頻繁的踢出加入ISR,這個(gè)數(shù)據(jù)會(huì)保存到zk和內(nèi)存中,所以會(huì)頻繁的更新zk和內(nèi)存。
?
但是對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒(méi)必要等ISR中的follower全部接受成功
所以kafka為用戶提供了三種可靠性級(jí)別,用戶可以根據(jù)可靠性和延遲進(jìn)行權(quán)衡,這個(gè)設(shè)置在kafka的生成中設(shè)置:acks參數(shù)設(shè)置
A、acks為0
生產(chǎn)者不等ack,只管往topic丟數(shù)據(jù)就可以了,這個(gè)丟數(shù)據(jù)的概率非常高
B、ack為1
Leader落盤后就會(huì)返回ack,會(huì)有數(shù)據(jù)丟失的現(xiàn)象,如果leader在同步完成后出現(xiàn)故障,則會(huì)出現(xiàn)數(shù)據(jù)丟失
C、ack為-1(all)
Leader和follower(ISR)落盤才會(huì)返回ack,會(huì)有數(shù)據(jù)重復(fù)現(xiàn)象,如果在leader已經(jīng)寫完成,且follower同步完成,但是在返回ack的出現(xiàn)故障,則會(huì)出現(xiàn)數(shù)據(jù)重復(fù)現(xiàn)象;極限情況下,這個(gè)也會(huì)有數(shù)據(jù)丟失的情況,比如follower和leader通信都很慢,所以ISR中只有一個(gè)leader節(jié)點(diǎn),這個(gè)時(shí)候,leader完成落盤,就會(huì)返回ack,如果此時(shí)leader故障后,就會(huì)導(dǎo)致丟失數(shù)據(jù)
?
3.3 Kafka如何保證消費(fèi)數(shù)據(jù)的一致性?通過(guò)HW來(lái)保證
?
?
?
LEO:指每個(gè)follower的最大的offset
HW(高水位):指消費(fèi)者能見到的最大的offset,LSR隊(duì)列中最小的LEO,也就是說(shuō)消費(fèi)者只能看到1~6的數(shù)據(jù),后面的數(shù)據(jù)看不到,也消費(fèi)不了
避免leader掛掉后,比如當(dāng)前消費(fèi)者消費(fèi)8這條數(shù)據(jù)后,leader掛??
了,此時(shí)比如f2成為leader,f2根本就沒(méi)有9這條數(shù)據(jù),那么消費(fèi)者就會(huì)報(bào)錯(cuò),所以設(shè)計(jì)了HW這個(gè)參數(shù),只暴露最少的數(shù)據(jù)給消費(fèi)者,避免上面的問(wèn)題
?
3.3.1、HW保證數(shù)據(jù)存儲(chǔ)的一致性
A、Follower故障
Follower發(fā)生故障后會(huì)被臨時(shí)提出LSR,待該follower恢復(fù)后,follower會(huì)讀取本地的磁盤記錄的上次的HW,并將該log文件高于HW的部分截取掉,從HW開始想leader進(jìn)行同步,等該follower的LEO大于等于該P(yáng)artition的hw,即follower追上leader后,就可以重新加入LSR
B、Leader故障
Leader發(fā)生故障后,會(huì)從ISR中選出一個(gè)新的leader,之后,為了保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的follower會(huì)先將各自的log文件高于hw的部分截掉(新leader自己不會(huì)截掉),然后從新的leader同步數(shù)據(jù)
注意:這個(gè)是為了保證多個(gè)副本間的數(shù)據(jù)存儲(chǔ)的一致性,并不能保證數(shù)據(jù)不丟失或者不重復(fù)
?
3.3.2精準(zhǔn)一次(冪等性),保證數(shù)據(jù)不重復(fù)
?
Ack設(shè)置為-1,則可以保證數(shù)據(jù)不丟失,但是會(huì)出現(xiàn)數(shù)據(jù)重復(fù)(at least once)
Ack設(shè)置為0,則可以保證數(shù)據(jù)不重復(fù),但是不能保證數(shù)據(jù)不丟失(at most once)
但是如果魚和熊掌兼得,該怎么辦?這個(gè)時(shí)候就就引入了Exactl once(精準(zhǔn)一次)
?
在0.11版本后,引入冪等性解決kakfa集群內(nèi)部的數(shù)據(jù)重復(fù),在0.11版本之前,在消費(fèi)者處自己做處理
如果啟用了冪等性,則ack默認(rèn)就是-1,kafka就會(huì)為每個(gè)生產(chǎn)者分配一個(gè)pid,并未每條消息分配seqnumber,如果pid、partition、seqnumber三者一樣,則kafka認(rèn)為是重復(fù)數(shù)據(jù),就不會(huì)落盤保存;但是如果生產(chǎn)者掛掉后,也會(huì)出現(xiàn)有數(shù)據(jù)重復(fù)的現(xiàn)象;所以冪等性解決在單次會(huì)話的單個(gè)分區(qū)的數(shù)據(jù)重復(fù),但是在分區(qū)間或者跨會(huì)話的是數(shù)據(jù)重復(fù)的是無(wú)法解決的
3.4 kafka的消費(fèi)者
3.4.1 消費(fèi)方式
消息隊(duì)列有兩種消費(fèi)消息的方式,push(微信公眾號(hào))、pull(kafka),push模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄M(fèi)發(fā)送速率是由broker決定的,他的目標(biāo)是盡可能以最快的的速度傳遞消息,但是這樣很容易造成消費(fèi)者來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull的方式可以消費(fèi)者的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息
Pull的模式不足之處是如果kafka沒(méi)有數(shù)據(jù),消費(fèi)者可能會(huì)陷入死循環(huán),一直返回空數(shù)據(jù),針對(duì)這一點(diǎn),kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)候回傳遞一個(gè)timeout參數(shù),如果當(dāng)時(shí)沒(méi)有數(shù)據(jù)可供消費(fèi),消費(fèi)者會(huì)等待一段時(shí)間在返回
3.4.2 分區(qū)分配策略
一個(gè)消費(fèi)者組有多個(gè)消費(fèi)者,一個(gè)topic有多個(gè)partition。所以必然會(huì)涉及到partition的分配問(wèn)題,即確定哪個(gè)partition由哪個(gè)消費(fèi)者來(lái)消費(fèi)
Kafka提供兩種方式,一種是輪詢(RountRobin)對(duì)于topic組生效,一種是(Range)對(duì)于單個(gè)topic生效
?
輪訓(xùn):前置條件是需要一個(gè)消費(fèi)者里的消費(fèi)者訂閱的是相同的topic。不然就會(huì)出現(xiàn)問(wèn)題;非默認(rèn)的的方式
?
同一個(gè)消費(fèi)者組里的消費(fèi)者不能同時(shí)消費(fèi)同一個(gè)分區(qū)
比如三個(gè)消費(fèi)者消費(fèi)一個(gè)topic的9個(gè)分區(qū)
?
?
?
?
如果一個(gè)消費(fèi)者組里有2個(gè)消費(fèi)者,這個(gè)消費(fèi)者組里同時(shí)消費(fèi)2個(gè)topic,每個(gè)topic又有三個(gè)partition
首先會(huì)把2個(gè)topic當(dāng)做一個(gè)主題,然后根據(jù)topic和partition做hash,然后在按照hash排序。然后輪訓(xùn)分配給一個(gè)消費(fèi)者組中的2個(gè)消費(fèi)者
?
如果是下面這樣的方式訂閱的呢?
比如有3個(gè)topic,每個(gè)topic有3個(gè)partition,一個(gè)消費(fèi)者組中有2個(gè)消費(fèi)者。消費(fèi)者1訂閱topic1和topic2,消費(fèi)者2訂閱topic2和topic3,那么這樣的場(chǎng)景,使用輪訓(xùn)的方式訂閱topic就會(huì)有問(wèn)題
?
如果是下面這種方式訂閱呢
比如有2個(gè)topic,每個(gè)topic有3個(gè)partition,一個(gè)消費(fèi)者組
有2個(gè)消費(fèi)者,消費(fèi)者1訂閱topic1,消費(fèi)者2訂閱topic2,這樣使用輪訓(xùn)的方式訂閱topic也會(huì)有問(wèn)題
?
所以我們一直強(qiáng)調(diào),使用輪訓(xùn)的方式訂閱topic的前提是一個(gè)消費(fèi)者組中的所有消費(fèi)者訂閱的主題是一樣的;
所以輪訓(xùn)的方式不是kafka默認(rèn)的方式
Range:是按照單個(gè)topic來(lái)劃分的,默認(rèn)的分配方式
?
?
?
?
?
Range的問(wèn)題會(huì)出現(xiàn)消費(fèi)者數(shù)據(jù)不均衡的問(wèn)題
比如下面的例子,一個(gè)消費(fèi)者組訂閱了2個(gè)topic,就會(huì)出現(xiàn)消費(fèi)者1消費(fèi)4個(gè)partition,而另外一個(gè)消費(fèi)者只消費(fèi)2個(gè)partition
?
?
分區(qū)策略什么時(shí)候會(huì)觸發(fā)呢?當(dāng)消費(fèi)者組里的消費(fèi)者個(gè)數(shù)變化的時(shí)候,會(huì)觸發(fā)分區(qū)策略調(diào)整,比如消費(fèi)者里增加消費(fèi)者,或者減少消費(fèi)者
3.4.3 offset的維護(hù)
由于消費(fèi)者在消費(fèi)過(guò)程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,消費(fèi)者恢復(fù)后,需要從故障前的位置繼續(xù)消費(fèi),所以消費(fèi)者需要實(shí)施記錄自己消費(fèi)哪個(gè)offset,以便故障恢復(fù)后繼續(xù)消費(fèi)
Offset保存的位置有2個(gè),一個(gè)zk,一個(gè)是kafka
首先看下offset保存到zk
由消費(fèi)者組、topic、partition三個(gè)元素確定唯一的offset
?
所以消費(fèi)者組中的某個(gè)消費(fèi)者掛掉之后,或者的消費(fèi)者還是可以拿到這個(gè)offset的
?
?
Controller這個(gè)節(jié)點(diǎn)和zk通信,同步數(shù)據(jù),這個(gè)節(jié)點(diǎn)就是誰(shuí)先起來(lái),誰(shuí)就先注冊(cè)controller,誰(shuí)就是controller。其他節(jié)點(diǎn)和controller信息保持同步
?
3.4.5、消費(fèi)者組的案例
修改消費(fèi)者組id
?
?
啟動(dòng)一個(gè)消費(fèi)者發(fā)送3條數(shù)據(jù)
?
?
指定消費(fèi)者組啟動(dòng)消費(fèi)者,啟動(dòng)三個(gè)消費(fèi)者,可以看到每個(gè)消費(fèi)者消費(fèi)了一條數(shù)據(jù)
?
?
?
?
?
?
?
在演示下不同組可以消費(fèi)同一個(gè)topic的,我們看到2個(gè)消費(fèi)者的消費(fèi)者都消費(fèi)到同一條數(shù)據(jù)
再次啟動(dòng)一個(gè)消費(fèi)者,這個(gè)消費(fèi)者屬于另外一個(gè)消費(fèi)者組
?
?
?
?
四、Kafka的高效讀寫機(jī)制
4.1、分布式部署
多節(jié)點(diǎn)并行操作
?
4.2、順序?qū)懘疟P
Kafka的producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過(guò)程中一直追加到文件末尾,為順序?qū)?,官網(wǎng)有數(shù)據(jù)表明。同樣的磁盤,順序?qū)懩艿?00M/S,而隨機(jī)寫只有100K/S。這與磁盤的機(jī)械結(jié)構(gòu)有關(guān),順序?qū)懼钥?,是因?yàn)槠涫∪チ舜罅看蓬^尋址的時(shí)間
4.3、零復(fù)制技術(shù)
正常情況下,先把數(shù)據(jù)讀到內(nèi)核空間,在從內(nèi)核空間把數(shù)據(jù)讀到用戶空間,然后在調(diào)操作系統(tǒng)的io接口寫到內(nèi)核空間,最終在寫到硬盤中
?
?
Kafka是這樣做的,直接在內(nèi)核空間流轉(zhuǎn)io流,所以kafka的性能非常高
?
?
?
?
五、 zookeeper在kafka中的作用
Kafka集群中有一個(gè)broker會(huì)被選舉為controller,負(fù)責(zé)管理集群broker的上下線,所有的topic的分區(qū)副本分配和leader選舉等工作
?
熱門工具 換一換
