Apache Pulsar
Pulsar是一個(gè)支持多租戶(hù)的、高性能的服務(wù)與服務(wù)之間消息通訊的解決方案,最初由雅虎開(kāi)發(fā),現(xiàn)在由Apache軟件基金會(huì)管理。?
Pulsar在Yahoo的生產(chǎn)環(huán)境運(yùn)行了三年多,助力Yahoo的主要應(yīng)用,如Yahoo Mail、Yahoo Finance、Yahoo
Sports、Flickr、Gemini廣告平臺(tái)和Yahoo分布式鍵值存儲(chǔ)系統(tǒng)Sherpa。
?Kafka不夠好,智聯(lián)招聘基于Pulsar打造企業(yè)級(jí)事件中心。
Pulsar的主要特性如下:
* Pulsar實(shí)例原生支持多集群,能無(wú)縫的基于地理位置進(jìn)行跨集群備份
* 非常低的消息發(fā)布和端到端的延遲
* 無(wú)縫擴(kuò)展到超過(guò)百萬(wàn)個(gè)topic
* 支持Java,Go,Pytho和C++的客戶(hù)端
* Topic支持多種訂閱模式: 獨(dú)占(exclusive), 共享(shared)和災(zāi)備(failover)
* 通過(guò)Apache BookKeeper提供的持久化消息存儲(chǔ)機(jī)制保證消息的送達(dá)
* serverless的輕量級(jí)計(jì)算框架Pulsar Functions提供了原生的流數(shù)據(jù)處理
* serverless的連接器框架Pulsar IO構(gòu)建于 Pulsar Functions之上,能夠輕松的將數(shù)據(jù)從Pulsar中移入和移出
* 當(dāng)數(shù)據(jù)老化時(shí),分層存儲(chǔ)將數(shù)據(jù)從熱存儲(chǔ)卸載到冷存儲(chǔ)(如S3和GCS)
1. 消息系統(tǒng)概念
Pulsar采用了發(fā)布訂閱的設(shè)計(jì)模式,也稱(chēng)作pub-sub。該設(shè)計(jì)模式中,producer發(fā)布消息到topic,consumer可以訂閱這些topic,處理發(fā)布過(guò)來(lái)的消息,在處理完成后發(fā)送確認(rèn)。
一旦訂閱被創(chuàng)建,所有的消息都將被Pulsar保留,即使consumer斷開(kāi)連接。 只有在consumer確認(rèn)消息被成功處理后,保留下來(lái)的消息才會(huì)被丟棄。
1.1 Messages
消息是Pulsar的基礎(chǔ)單元。 消息就是producer發(fā)給topic的內(nèi)容,以及consumer從topic消費(fèi)的內(nèi)容(消息處理完成后發(fā)送確認(rèn))。
消息類(lèi)似于郵政系統(tǒng)中的信件。
消息包含了多個(gè)屬性:Value(數(shù)據(jù)),Key(打標(biāo)簽,用來(lái)壓縮消息),Properties(可選,用戶(hù)自定義key/value),Producer
name(生產(chǎn)者名稱(chēng),可默認(rèn)生成,也可指定),Sequence ID(消息的序列id),Publish time(發(fā)布時(shí)間,生產(chǎn)者自動(dòng)加上),Event
time(消息的可選時(shí)間戳)。
1.2 Producers
生產(chǎn)者是關(guān)聯(lián)topic的程序,它發(fā)布消息到Pulsar的broker上。
發(fā)送模式:Producer可以以同步(sync)或者異步(async)的方式發(fā)布消息到broker。
* 同步發(fā)送:producer發(fā)送每條消息后會(huì)等待broker的確認(rèn),如果沒(méi)有收到確認(rèn)信息,producer會(huì)認(rèn)為發(fā)送失敗
* 異步發(fā)送:Producer將會(huì)把消息放入blocking隊(duì)列,然后馬上返回。 然后客戶(hù)端在后臺(tái)將消息發(fā)送給broker。如果隊(duì)列已滿(mǎn)(
配置的最大數(shù)量),根據(jù)傳入producer的參數(shù),producer可能阻塞或者直接返回失敗。
壓縮:消息在發(fā)送過(guò)程中可以被壓縮來(lái)節(jié)省帶寬,pulsar支持LZ4,ZLIB,ZSTD,SNAPPY類(lèi)型。
批處理:如果啟用了批處理,生產(chǎn)者將在單個(gè)請(qǐng)求中發(fā)送批量消息。批處理大小由最大消息數(shù)和最大發(fā)布延遲決定。
1.3 Consumers
消費(fèi)者是訂閱關(guān)聯(lián)topic,然后接收消息的程序。
接收模式:消息可以通過(guò)同步或者異步的方式從broker接收。
* 同步接收:同步接收將會(huì)阻塞,直到消息可用
* 異步接收:異步接收立即返回future值,例如java中的CompletableFuture,一旦新消息可用,它立即完成。
監(jiān)聽(tīng)
:客戶(hù)端庫(kù)為consumers提供listener的實(shí)現(xiàn),例如Java客戶(hù)端,提供MesssageListener接口,實(shí)現(xiàn)該接口,一旦接受到新的消息,received方法將被調(diào)用。
void received(Consumer<T> consumer,Message<T> msg);
?
確認(rèn):當(dāng)一個(gè)consumer 成功消費(fèi)掉一條消息后,那么這個(gè)consumer會(huì)發(fā)送一個(gè)確認(rèn)請(qǐng)求到broker,broker會(huì)丟棄這條消息,否則保存這條消息。
消息的確認(rèn)可以一個(gè)接一個(gè),也可以累積一起。 累積確認(rèn)時(shí),消費(fèi)者只需要確認(rèn)最后一條他收到的消息。 所有之前(包含此條)的消息,都不會(huì)被再次重發(fā)給那個(gè)消費(fèi)者。
累積消息確認(rèn)不能用于共享訂閱模式,因?yàn)楣蚕砟J街?,一個(gè)訂閱會(huì)涉及到多個(gè)消費(fèi)者。
共享模式中,多條消息可以單獨(dú)確認(rèn)。
否定確認(rèn):當(dāng)consumer
在一定時(shí)間內(nèi)沒(méi)有成功消費(fèi)消息,而想再次消費(fèi)該條消息,那么這個(gè)consumer可以發(fā)送一個(gè)否定確認(rèn)到broker,然后broker重發(fā)這條消息。消息可以一條接一條的否定確認(rèn),也可以累積否定確認(rèn),這取決于消費(fèi)訂閱模式。在獨(dú)占和災(zāi)備模式,消費(fèi)者只能否定確認(rèn)其接收的最后一條消息。在共享模式,消費(fèi)者可以獨(dú)立否定確認(rèn)。
確認(rèn)超時(shí)
:當(dāng)一條消息沒(méi)有被成功消費(fèi),并且您想要觸發(fā)broker自動(dòng)重發(fā)消息時(shí),您可以采用未確認(rèn)消息自動(dòng)重發(fā)機(jī)制??蛻?hù)端將在整個(gè)AckTimeout時(shí)間范圍內(nèi)跟蹤未確認(rèn)的消息,并在指定確認(rèn)超時(shí)時(shí)間自動(dòng)向broker發(fā)送重發(fā)未確認(rèn)的消息請(qǐng)求。
在確認(rèn)超時(shí)之前使用否定確認(rèn)。否定確認(rèn)以更精確的方式控制單個(gè)消息的重發(fā),并在消息處理時(shí)間超過(guò)確認(rèn)超時(shí)時(shí)間后,避免無(wú)效的重發(fā)消息。
死信(Dead letter)topic
:死信topic使您能夠在消費(fèi)者無(wú)法成功消費(fèi)某些消息時(shí)消費(fèi)新消息。在這種機(jī)制中,無(wú)法消費(fèi)的消息存儲(chǔ)在單獨(dú)的topic,稱(chēng)為死信topic。您可以決定如何處理死信topic中的消息。
在Java客戶(hù)端中,可以使用以下例子處理死信topic:
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic) .subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount) .build()) .subscribe();
?
死信topic依賴(lài)于消息的重發(fā)。您需要確認(rèn)消息的重發(fā)方法:否定確認(rèn)或確認(rèn)超時(shí)。在確認(rèn)超時(shí)之前使用否定確認(rèn)。
目前,死信topic僅適用于共享模式。
1.4 Topics
和其他的發(fā)布訂閱系統(tǒng)一樣,Pulsar 中的 topic 是被命名的通道,用做從producer到 consumer傳輸消息。
Topic的名稱(chēng)是具有明確定義結(jié)構(gòu)的URL:
{persistent|non-persistent}://tenant/namespace/topic
?
persistent/non-persistent
:topic的類(lèi)型,包括持久化和非持久化(默認(rèn)是持久類(lèi)型)。topic指定持久化后,所有的消息會(huì)持久化到硬盤(pán)(這意味著多塊硬盤(pán),除非是單機(jī)模式的broker)。反之,非持久topic的數(shù)據(jù)不會(huì)存儲(chǔ)到硬盤(pán)上。
tenant:topic在實(shí)例中的租戶(hù),租戶(hù)對(duì)于Pulsar的多租戶(hù)來(lái)說(shuō)是必不可少的,可以分布在多個(gè)集群中。
namespace:Topic的管理單元,充當(dāng)關(guān)聯(lián)topic組的管理機(jī)制。 大多數(shù)的topic配置在namespace層面生效。
每個(gè)tenant可以有多個(gè)namespace。
topic:topic名稱(chēng)是自由定義的,在pulsar實(shí)例中無(wú)特殊意義。
1.4.1 namespace
命名空間是租戶(hù)內(nèi)部邏輯上的命名術(shù)語(yǔ)。 一個(gè)租戶(hù)可以通過(guò)admin API創(chuàng)建多個(gè)命名空間。
例如,一個(gè)對(duì)接多個(gè)應(yīng)用的租戶(hù),可以為每個(gè)應(yīng)用創(chuàng)建不同的namespace。 Namespace使得程序可以以層級(jí)的方式創(chuàng)建和管理topic。
例如:"my-tenant/app1" ,它的namespace是app1這個(gè)應(yīng)用,對(duì)應(yīng)的租戶(hù)是 my-tenant。
你可以在namespace下創(chuàng)建任意數(shù)量的topic。
1.4.2 訂閱模型
訂閱是命名好的配置規(guī)則,用于確定如何將消息發(fā)給消費(fèi)者。Pulsar有三種訂閱模式:exclusive(獨(dú)占),shared(共享),failover(災(zāi)備)。
下圖展示了這三種模式:
1.4.2.1 Exclusive
獨(dú)占模式,只能有一個(gè)消費(fèi)者訂閱topic。 如果多于一個(gè)消費(fèi)者嘗試以同樣方式去訂閱topic,消費(fèi)者將會(huì)收到錯(cuò)誤。
上面的圖中,只有Consumer A可以消費(fèi)。
Exclusive模式為默認(rèn)訂閱模式。
1.4.2.2 Failover
災(zāi)備模式,多個(gè)consumer可以綁定到同一個(gè)訂閱。Consumer將會(huì)按字典順序排序,第一個(gè)consumer被初始化為唯一接受消息的消費(fèi)者,這個(gè)consumer被稱(chēng)為master
consumer。
當(dāng)master consumer斷開(kāi)時(shí),所有的消息(未被確認(rèn)和后續(xù)進(jìn)入的)將會(huì)被分發(fā)給隊(duì)列中的下一個(gè)consumer。 下圖中,Consumer
B-0是master consumer,當(dāng)Consumer B-0斷開(kāi)連接時(shí),由于Consumer B-1在隊(duì)列中下一個(gè)位置,那么它將會(huì)開(kāi)始接收消息。
1.4.2.3 Shared
共享模式,多個(gè)消費(fèi)者可以綁定到同一個(gè)訂閱上。 消息通過(guò)round
robin輪詢(xún)機(jī)制分發(fā)給不同的消費(fèi)者,并且每個(gè)消息僅會(huì)被分發(fā)給一個(gè)消費(fèi)者。當(dāng)消費(fèi)者斷開(kāi)連接,所有被發(fā)送給他,但沒(méi)有被確認(rèn)的消息將被重新安排,分發(fā)給其它存活的消費(fèi)者。
下圖中,topic下有5條消息,m0~m4,消費(fèi)者有C1/C2/C3,最終m0和m3分配給C1,m1分給C2,m2和m4分給C3,可以說(shuō)明每個(gè)消息僅發(fā)給一個(gè)消費(fèi)者。
Shared模式的限制: 有兩點(diǎn)需注意,1、不保證消息順序; 2、不能使用累計(jì)確認(rèn)
Key_shared:
在Key-shared模式下,多個(gè)消費(fèi)者可以關(guān)聯(lián)到同一訂閱。消息以分布式在消費(fèi)者之間傳遞,具有相同key/orderingKey
的消息僅傳遞給一個(gè)消費(fèi)者。無(wú)論消息被重發(fā)多少次,它都發(fā)給同一個(gè)消費(fèi)者。當(dāng)消費(fèi)者連接或斷開(kāi)連接時(shí),將導(dǎo)致某些消息的key的消費(fèi)者變更。
該模式限制:消息必須指定key/orderingKey;不能使用累計(jì)確認(rèn);該模式目前是測(cè)試版,可以在broker.config禁用。
1.5 多topic訂閱
當(dāng)consumer訂閱pulsar的topic時(shí),它默認(rèn)指定訂閱了一個(gè)topic,例如:
persistent://public/default/my-topic。
從Pulsar的1.23.0-incubating的版本開(kāi)始,Pulsar消費(fèi)者可以同時(shí)訂閱多個(gè)topic。 你可以用以下兩種方式定義topic的列表:
* 通過(guò)最基礎(chǔ)的正則表達(dá)式(regex),例如 persistent://public/default/finance-.*
* 通過(guò)明確指定的topic列表
通過(guò)正則訂閱多主題時(shí),所有的主題必須在同一個(gè)namespace。
當(dāng)訂閱多主題時(shí),Pulsar客戶(hù)端會(huì)自動(dòng)調(diào)用Pulsar的API來(lái)發(fā)現(xiàn)匹配表達(dá)式或者列表的所有topic,然后全部訂閱。
如果此時(shí)有暫不存在的topic,那么一旦這些topic被創(chuàng)建,conusmer會(huì)自動(dòng)訂閱。
不能保證順序性 當(dāng)消費(fèi)者訂閱多topic時(shí),Pulsar所提供對(duì)單一topic訂閱的順序保證,就hold不住了。
如果你在使用Pulsar的時(shí)候,遇到必須保證順序的需求,強(qiáng)烈建議不要使用此特性。
下面是多主題訂閱在java中的例子:
import java.util.regex.Pattern; import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.PulsarClient; PulsarClient pulsarClient = //
實(shí)例化pulsar客戶(hù)端// 訂閱一個(gè)namespace下的所有topic Pattern allTopicsInNamespace =
Pattern.compile("persistent://public/default/.*"); Consumer<byte[]>
allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsInNamespace) .subscriptionName("subscription-1")
.subscribe();// 根據(jù)正則訂閱一個(gè)namespace下的多個(gè)topic Pattern someTopicsInNamespace =
Pattern.compile("persistent://public/default/foo.*"); Consumer<byte[]>
someTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(someTopicsInNamespace) .subscriptionName("subscription-1")
.subscribe();
?
1.6 Partitioned topics(分區(qū)topic)
通常一個(gè)topic僅被一個(gè)broker服務(wù),這限制了topic的最大吞吐量。
分區(qū)topic是特殊的topic類(lèi)型,他可以被多個(gè)broker處理,這讓topic有更高的吞吐量。
其實(shí)在背后,分區(qū)的topic通過(guò)N個(gè)內(nèi)部topic實(shí)現(xiàn),N是分區(qū)的數(shù)量。 當(dāng)向分區(qū)的topic發(fā)送消息,每條消息被路由到其中一個(gè)broker。
Pulsar自動(dòng)處理跨broker的分區(qū)分布。
下圖對(duì)此做了闡明:
分析上圖可知,Topic1有5個(gè)分區(qū)(P0到P4),分布在3個(gè)broker上。因?yàn)榉謪^(qū)數(shù)量多于broker數(shù)量,其中有兩個(gè)broker每個(gè)處理兩個(gè)分區(qū),第三個(gè)broker則只處理一個(gè)。(再次強(qiáng)調(diào),分區(qū)的分布是Pulsar自動(dòng)處理的)。
這個(gè)topic的消息被廣播給兩個(gè)consumer,路由模式?jīng)Q定哪個(gè)broker處理哪個(gè)partition,訂閱模式?jīng)Q定哪條消息發(fā)送到哪個(gè)consumer。
大多數(shù)境況下,路由和訂閱模式可以分開(kāi)制定。通常來(lái)講,吞吐能力的要求,決定了分區(qū)/路 的方式。訂閱模式則應(yīng)該由應(yīng)用來(lái)做決定。
分區(qū)topic和普通topic,對(duì)于訂閱模式如何工作,沒(méi)有任何不同。分區(qū)只是決定了從生產(chǎn)者生產(chǎn)消息到消費(fèi)者處理及確認(rèn)消息過(guò)程中發(fā)生的事情。
分區(qū)topic需要通過(guò)admin API顯式創(chuàng)建,創(chuàng)建topic時(shí)可以指定分區(qū)數(shù)。
1.6.1 路由模式
發(fā)布到分區(qū)主題時(shí),必須指定路由模式。路由模式?jīng)Q定每個(gè)消息應(yīng)該發(fā)布到哪個(gè)分區(qū),即哪個(gè)內(nèi)部主題。三種路由模式如下:
*
RoundRobinPartition:如果沒(méi)有key,所有的消息通過(guò)輪詢(xún)方式被路由到不同的分區(qū),以達(dá)到最大吞吐量。請(qǐng)注意round-robin并不是作用于每條單獨(dú)的消息,而是作用于延遲處理的批次邊界,以確保批處理有效。
如果為message指定了key,分區(qū)的producer會(huì)把key做hash,然后分配消息到指定的分區(qū)。 這是默認(rèn)的模式。
* SinglePartition:如果沒(méi)有key被提供,producer將會(huì)隨機(jī)選擇一個(gè)分區(qū),把所有的消息發(fā)往該分區(qū)。
如果為message指定了key,分區(qū)的producer會(huì)把key做hash,然后分配消息到指定的分區(qū)。
* CustomPartition:使用客制化消息路由實(shí)現(xiàn),可以決定特定的消息進(jìn)入指定的分區(qū)。 用戶(hù)可以創(chuàng)建客制化的路由模式,通過(guò)使用 Java
client ,實(shí)現(xiàn)MessageRouter接口。
1.7 順序保證
消息的順序與消息路由模式和消息的key有關(guān)。通常,用戶(hù)需要對(duì)每個(gè)key分區(qū)的消息保證順序。
當(dāng)使用
SinglePartition或者RoundRobinPartition模式時(shí),如果消息有key,消息將會(huì)被路由到匹配的分區(qū),這是基于ProducerBuilder
中HashingScheme 指定的散列shema。
順序保證有兩種方式:
* 按key分區(qū):所有擁有相同key的消息有序,
并且會(huì)被發(fā)送至相同的partition。使用SinglePartition或RoundRobinPartition模式, 每條消息都需要有key。
* 按producer:來(lái)自于相同producer的消息有序,路由策略為SinglePartition, 且每條消息都沒(méi)有key。
1.7.1 HashingScheme
HashingScheme 是代表一組標(biāo)準(zhǔn)散列函數(shù)的枚舉,為一個(gè)指定消息選擇分區(qū)時(shí)使用。 有兩種可用的散列函數(shù):JavaStringHash 和Murmur3
32Hash,producer
的默認(rèn)hash函數(shù)是JavaStringHash。請(qǐng)注意,當(dāng)producer可能來(lái)自于不同語(yǔ)言客戶(hù)端時(shí),JavaStringHash是不起作用的。建議使用Murmur3
32Hash。
1.8 非持久topic
默認(rèn)情況下,Pulsar保存所有沒(méi)有確認(rèn)的消息到多個(gè)BookKeeper的bookies中(存儲(chǔ)節(jié)點(diǎn))。持久topic的消息數(shù)據(jù)可以在broker重啟或者訂閱者出問(wèn)題的情況下存活下來(lái)。
因此,持久性topic上的消息數(shù)據(jù)可以在 broker 重啟和訂閱者故障轉(zhuǎn)移之后繼續(xù)存在。
但是,Pulsar還支持非持久性topic,這些topic的消息從不持久化存儲(chǔ)到磁盤(pán),只存在于內(nèi)存中。
Pulsar也提供了非持久topic。非持久topic的消息不會(huì)被保存在硬盤(pán)上,只存活于內(nèi)存中。當(dāng)使用非持久topic分發(fā)時(shí),關(guān)掉Pulsar的broker或者關(guān)閉訂閱者,此topic(
non-persistent))上所有的瞬時(shí)消息都會(huì)丟失,意味著客戶(hù)端可能會(huì)遇到消息缺失。
非持久性topic具有這種形式的名稱(chēng)(注意名稱(chēng)中的 non-persistent):
non-persistent://tenant/namespace/topic
?
非持久topic中,broker會(huì)立即發(fā)布消息給所有連接的訂閱者,而不會(huì)在BookKeeper中存儲(chǔ)。
如果有一個(gè)訂閱者斷開(kāi)連接,broker將無(wú)法重發(fā)這些瞬時(shí)消息,訂閱者將永遠(yuǎn)也不能收到這些消息了。
去掉持久化存儲(chǔ)的步驟,在某些情況下,使得非持久topic的消息比持久topic稍微變快。但是同時(shí),Pulsar的一些核心優(yōu)勢(shì)也喪失掉了。
非持久topic,消息數(shù)據(jù)僅存活在內(nèi)存。 如果broker掛掉或者因其他情況不能從內(nèi)存取到,你的消息數(shù)據(jù)就可能丟失。
只有在真的確信你的使用場(chǎng)景符合,并且你可以忍受時(shí),才可去使用非持久topic。
默認(rèn)非持久topic在broker上是開(kāi)啟的。 你可以通過(guò)broker的配置關(guān)閉。
你可以通過(guò)使用pulsar-admin-topics接口管理非持久topic。
1.8.1 性能
非持久消息通常比持久消息更快,因?yàn)閎roker無(wú)須持久化消息,當(dāng)消息被分發(fā)給所有訂閱者時(shí),會(huì)立即發(fā)送ack給producer。
非持久topic讓producer有更低的發(fā)布延遲。
1.8.2 客戶(hù)端API
Producer和consumer連接持久topic和連接到非持久topic的方式是一樣的。非持久的區(qū)別在于,topic的名稱(chēng)必須以non-persistent開(kāi)頭。
三種訂閱模式--exclusive,shared,failover對(duì)于非持久topic都是支持的。
下面是一個(gè)非持久topic的java consumer例子:
PulsarClient client = PulsarClient.builder() .serviceUrl(
"pulsar://localhost:6650") .build(); String npTopic =
"non-persistent://public/default/my-topic";//這里表明是非持久化 String subscriptionName
= "my-subscription-name"; Consumer<byte[]> consumer = client.newConsumer()
.topic(npTopic) .subscriptionName(subscriptionName) .subscribe();
?
這里還有一個(gè)非持久topic的java producer例子:
Producer<byte[]> producer = client.newProducer() .topic(npTopic) .create();
?
1.9 消息保留和到期(retention and expiry)
Pulsar broker默認(rèn)如下:
* 立即刪除所有已經(jīng)被cunsumer確認(rèn)過(guò)的的消息
* 以消息backlog的形式,持久保存所有的未被確認(rèn)消息
Pulsar有兩個(gè)特性,讓你可以覆蓋上面的默認(rèn)行為:
* 消息存留讓你可以保存consumer確認(rèn)過(guò)的消息
* 消息過(guò)期讓你可以給未被確認(rèn)的消息設(shè)置存活時(shí)長(zhǎng)(TTL) 所有消息保留和到期都在namespace級(jí)別進(jìn)行管理。有關(guān)操作方法,請(qǐng)參閱Message
retention and expiry cookbook。
下圖說(shuō)明了這兩種概念: 圖中第一個(gè)是消息存留,存留規(guī)則會(huì)被用于某namespace下所有的topic,指明哪些消息會(huì)被持久存儲(chǔ),即使已經(jīng)被確認(rèn)過(guò)。
沒(méi)有被留存規(guī)則覆蓋的消息將會(huì)被刪除。 沒(méi)有留存規(guī)則的話,所有被確認(rèn)的消息都會(huì)被刪除。
圖中第二個(gè)是消息過(guò)期,有些消息即使還沒(méi)有被確認(rèn),也被刪除掉了。因?yàn)楦鶕?jù)設(shè)置在namespace上的TTL,他們已經(jīng)過(guò)期了。(例如,TTL為5分鐘,過(guò)了十分鐘消息還沒(méi)被確認(rèn))
1.10 重復(fù)數(shù)據(jù)消除(Message deduplication)
當(dāng)消息被Pulsar持久化多于一次的時(shí)候,消息就會(huì)重復(fù)。 消息去重是Pulsar可選的特性,阻止不必要的消息重復(fù),每條消息僅處理一次,即使消息被接收多次。
下圖說(shuō)明了禁用和啟用重復(fù)數(shù)據(jù)消除的情況:
上圖第一個(gè)場(chǎng)景中,消息去重被關(guān)閉。 Producer發(fā)布消息1到一個(gè)topic,消息到達(dá)broker后,被持久化到BookKeeper。
然后producer又發(fā)送了消息1(可能因?yàn)槟承┲卦囘壿嫞?,然后消息被接收后又持久化在BookKeeper,這意味著消息重復(fù)發(fā)生了。
在第二個(gè)場(chǎng)景中,producer發(fā)送了消息1,消息被broker接收然后持久化,和第一個(gè)場(chǎng)景是一樣的。
當(dāng)producer再次發(fā)送消息時(shí),broker知道已經(jīng)收到個(gè)消息1,所以不會(huì)再持久化消息1。
消息重復(fù)數(shù)據(jù)消除是在namespace級(jí)別處理的。
1.10.1 生產(chǎn)者冪等
消息去重的另外一種方法是確保每條消息僅生產(chǎn)一次。 這種方法通常被叫做生產(chǎn)者冪等。 這種方式的缺點(diǎn)是,把消息去重的工作推給了應(yīng)用去做。
在Pulsar中,去重被broker處理的,這意味著你不需要修改你的客戶(hù)端代碼。 你只需要做一些管理上的變化(參考Managing message
deduplication )。
1.10.2 去重和實(shí)際一次語(yǔ)義
消息去重,使Pulsar成為與流處理引擎(SPE)或者其他尋求實(shí)際一次處理語(yǔ)義的系統(tǒng)連接的完美消息系統(tǒng)。
消息系統(tǒng)若不提供自動(dòng)消息去重,則需要SPE或者其他系統(tǒng)保證去重。這意味著嚴(yán)格的消息順序來(lái)自于讓程序承擔(dān)額外的去重工作。
使用Pulsar,嚴(yán)格的順序保證不會(huì)帶來(lái)任何應(yīng)用層面的代價(jià)。
結(jié)語(yǔ)
由于篇幅有限,本篇文章只講述Pulsar消息系統(tǒng)的基本概念,下篇文章重點(diǎn)講解Pulsar的架構(gòu)和客戶(hù)端庫(kù)使用教程。
參考文檔http://pulsar.apache.org/en/
?
熱門(mén)工具 換一換

感谢您访问我们的网站,您可能还对以下资源感兴趣:
调教肉文小说-国产成本人片免费av-空姐av种子无码-在线观看免费午夜视频-综合久久精品激情-国产成人丝袜视频在线观看软件-大芭区三区四区无码-啊啊好爽啊啊插啊用力啊啊-wanch视频网-国产精品成人a免费观看