Pulsar快速上手
前言
如果你還不了解Pulsar消息系統(tǒng),可以先看上一篇文章 企業(yè)級分布式消息系統(tǒng)-Pulsar入門基礎(chǔ)
<https://www.cnblogs.com/iceblow/p/11318650.html>
Pulsar客戶端支持多個(gè)語言,包括Java,Go,Pytho和C++,本篇文章只講述Java客戶端。
Pulsar Java客戶端既可用于創(chuàng)建消息的producers、consumers和readers ,也可用于執(zhí)行管理任務(wù)。Java 客戶端的當(dāng)前版本為
2.4.0。
1. 安裝
最新版本的Pulsar Java 客戶端庫可通過 Maven中央倉庫 使用。 要使用最新版本, 請將 pulsar-client 庫添加到構(gòu)建配置中。
1.1 Maven
如果你使用maven,添加以下內(nèi)容到你的 pom.xml 中:
<!-- 在你的 <properties> 部分--> <pulsar.version>2.4.0</pulsar.version> <!-- 在你的
<dependencies> 部分--> <dependency> <groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId> <version>${pulsar.version}</version>
</dependency>
?
1.2 Gradle
如果你使用Gradle,添加以下內(nèi)容到你的 build.gradle 中:
def pulsarVersion = '2.4.0' dependencies { compile group: 'org.apache.pulsar',
name: 'pulsar-client', version: pulsarVersion }
?
1.3 本地安裝Pulsar
Pulsar目前只支持MacOS和Linux系統(tǒng),JDK版本1.8及以上。
下載地址見下載說明及配置 <http://pulsar.apache.org/docs/en/standalone/>
,Windows的小伙伴們就不用下載了。
2.連接URL
要使用客戶端連接到Pulsar,你需要指定Pulsar 協(xié)議URL。
Pulsar協(xié)議URL分配給特定的集群,使用pulsar scheme ,默認(rèn)端口6650。以下是本地主機(jī)的示例:
pulsar://localhost:6650
?
如果有多個(gè)broker,那么URL如下:
pulsar://localhost:6550,localhost:6651,localhost:6652
?
生產(chǎn)環(huán)境的Pulsar 集群URL如下:
pulsar://pulsar.us-west.example.com:6650
?
如果需要TLS認(rèn)證,URL如下:
pulsar+ssl://pulsar.us-west.example.com:6651
?
3.客戶端配置
你可以用一個(gè)URL來實(shí)例化一個(gè)連接到指定的Pulsar 集群的 PulsarClient 對象,像這樣:
PulsarClient client = PulsarClient.builder() .serviceUrl(
"pulsar://localhost:6650") .build();
?
如果有多個(gè)brokers,實(shí)例化客戶端如下:
PulsarClient client = PulsarClient.builder() .serviceUrl(
"pulsar://localhost:6650,localhost:6651,localhost:6652") .build();
默認(rèn)的broker URL是單機(jī)集群。 如果你使用單機(jī)模式運(yùn)行一個(gè)集群,broker將默認(rèn)使用pulsar://localhost:6650
3.1 生產(chǎn)者
在Pulsar中,生產(chǎn)者寫消息到topic中。 一旦你實(shí)例化一個(gè)Pulsar Client對象,你可以創(chuàng)建一個(gè)Producer 用于特定的topic。
Producer<byte[]> producer = client.newProducer() .topic("my-topic") .create();
// 然后你就可以發(fā)送消息到指定的broker 和topic上: producer.send("My message".getBytes());
?
默認(rèn)情況下,生產(chǎn)者生成由字節(jié)數(shù)組組成的消息。當(dāng)然,你也可以指定消息類型,例如下面的String類型:
Producer<String> stringProducer = client.newProducer(Schema.STRING) .topic(
"my-topic") .create(); stringProducer.send("My message");
在不再使用時(shí),你需要確保關(guān)閉生產(chǎn)者、消費(fèi)者和客戶端
producer.close(); consumer.close(); client.close();
關(guān)閉操作也可以是異步的: //...業(yè)務(wù)代碼 producer.closeAsync() .thenRun(() ->
System.out.println("Producer closed")); .exceptionally((ex) -> {
System.err.println("Failed to close producer: " + ex); return ex; });
?
3.1.1 生產(chǎn)者配置
如果實(shí)例化生產(chǎn)者對象時(shí)僅指定topic名稱 (如上面的示例所示), 則生產(chǎn)者將使用默認(rèn)配置。 要使用非默認(rèn)配置, 你可以設(shè)置多種可配置的參數(shù)。詳情見
ProducerBuilder的文檔說明
<http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ProducerBuilder>
,下面是一個(gè)示例:
Producer<byte[]> producer = client.newProducer() .topic("my-topic") //主題名稱
.batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)//最大發(fā)布延遲時(shí)間 .sendTimeout(10,
TimeUnit.SECONDS)//超時(shí)時(shí)間 .blockIfQueueFull(true) //隊(duì)列滿了,是否阻塞 .create();
?
3.1.2 消息路由 #####
使用分區(qū)主題時(shí),當(dāng)你使用生產(chǎn)者發(fā)布消息時(shí)你可以指定路由模式。
3.1.3 異步發(fā)送
你可以使用Java客戶端異步發(fā)布消息。 使用異步發(fā)送,生產(chǎn)者將消息放入阻塞隊(duì)列并立即返回。 然后,客戶端將在后臺將消息發(fā)送給broker。
如果隊(duì)列已滿(配置的最大值),則在調(diào)用API時(shí),生產(chǎn)者可能會(huì)被阻塞或立即失敗,具體取決于傳遞給生產(chǎn)者的參數(shù)。
以下是異步發(fā)送操作的示例:
producer.sendAsync("my-async-message".getBytes()).thenAccept(msgId -> {
System.out.printf("Message with ID %s successfully sent", msgId); });
?
3.1.4 消息配置
除了value之外, 還可以在特定消息上設(shè)置其他選項(xiàng):
producer.newMessage() .key("my-message-key") //消息的key
.value("my-async-message".getBytes())//消息內(nèi)容的字節(jié)數(shù)組 .property("my-key", "my-value")
//自定義的key/value .property("my-other-key", "my-other-value") .send();
?
3.2 消費(fèi)者
在Pulsar中,消費(fèi)者訂閱topic并處理生產(chǎn)者發(fā)布到這些topic的消息。 你可以首先實(shí)例化一個(gè)PulsarClient對象并傳給他一個(gè)borker
URL(和生產(chǎn)樣的一樣)來實(shí)例化一個(gè)消費(fèi)者。
一旦實(shí)例化一個(gè)PulsarClient 對象,你可以指定一個(gè)主題和一個(gè)訂閱來創(chuàng)建一個(gè) Consumer 消費(fèi)者。
Consumer consumer = client.newConsumer() .topic("my-topic") //生產(chǎn)者定義的topic
.subscriptionName("my-subscription")//消費(fèi)者自定義的訂閱名稱 .subscribe();
?
subscribe()方法將自動(dòng)將訂閱消費(fèi)者指定的主題, 一種讓消費(fèi)者監(jiān)聽主題的方法是使用while循環(huán),示例如下:
while (true) { // 等待一個(gè)消息 Message msg = consumer.receive(); try { //
對這個(gè)消息的處理(業(yè)務(wù)) System.out.printf("Message received: %s", new
String(msg.getData()));// 消費(fèi)者確認(rèn)消息已消費(fèi),同時(shí)broker刪除該消息 consumer.acknowledge(msg); }
catch (Exception e) { // 消息處理失敗,否定確認(rèn),該消息稍后會(huì)重發(fā)
consumer.negativeAcknowledge(msg); } }
?
3.2.1 消費(fèi)者配置
如果實(shí)例化 消費(fèi)者對象, 僅指定主題和訂閱名稱, 如上面的示例所示, 消費(fèi)者將采用默認(rèn)配置。 要使用非默認(rèn)配置, 你可以設(shè)置多種可配置的參數(shù)。
詳情見ConsumerBuilder的說明
<http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/ConsumerBuilder>
,下面是一個(gè)示例:
Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName(
"my-subscription") .ackTimeout(10, TimeUnit.SECONDS) //確認(rèn)超時(shí)時(shí)間
.subscriptionType(SubscriptionType.Exclusive)//訂閱模式 .subscribe();
?
3.2.2 異步接收
receive方法將異步接受消息(消費(fèi)者處理器將被阻塞,直到有消息到達(dá))。
你也可以使用異步接收方法,這將在一個(gè)新消息到達(dá)時(shí)立即返回一個(gè)CompletableFuture對象。示例如下:
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
?
3.2.3 多主題訂閱
消費(fèi)者除了訂閱單個(gè)Pulsar主題外,你還可以使用多主題訂閱訂閱多個(gè)主題。 若要使用多主題訂閱, 可以提供一個(gè)topic正則表達(dá)式 (regex) 或
主題List 。 如果通過 regex 選擇主題, 則所有主題都必須位于同一Pulsar命名空間中。
下面是一些示例:
import org.apache.pulsar.client.api.Consumer; import
org.apache.pulsar.client.api.PulsarClient;import java.util.Arrays; import
java.util.List;import java.util.regex.Pattern; ConsumerBuilder consumerBuilder =
pulsarClient.newConsumer() .subscriptionName(subscription);// 訂閱命名空間中的所有主題
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*"
); Consumer allTopicsConsumer= consumerBuilder
.topicsPattern(allTopicsInNamespace) .subscribe();// 使用regex訂閱命名空間中的主題子集
Pattern someTopicsInNamespace =
Pattern.compile("persistent://public/default/foo.*"); Consumer allTopicsConsumer
= consumerBuilder .topicsPattern(someTopicsInNamespace) .subscribe();
?
你還可以訂閱明確的主題列表 (可跨命名空間):
List<String> topics = Arrays.asList( "topic-1", "topic-2", "topic-3" );
Consumer multiTopicConsumer= consumerBuilder .topics(topics) .subscribe(); //
或者: Consumer multiTopicConsumer = consumerBuilder .topics( "topic-1", "topic-2",
"topic-3" ) .subscribe();
?
你也可以使用subscribeAsync 方法異步訂閱多主題,下面是一個(gè)示例:
Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default.*"
); consumerBuilder .topics(topics) .subscribeAsync() .thenAccept(this
::receiveMessageFromConsumer);private void receiveMessageFromConsumer(Consumer
consumer) { consumer.receiveAsync().thenAccept(message-> { // 業(yè)務(wù)處理
receiveMessageFromConsumer(consumer); }); }
?
3.2.4 訂閱模型
Pulsar有多種訂閱模型來適用不同的場景,訂閱模型見Pulsar基礎(chǔ)概念
<https://www.cnblogs.com/iceblow/p/11318650.html>,下面講述如何使用。
為了更好的描述他們之間的不同,假設(shè)你創(chuàng)建了一個(gè)topic,命名為"my-topic",生產(chǎn)者發(fā)布了10條消息,示例如下:
//創(chuàng)建生產(chǎn)者 Producer<String> producer = client.newProducer(Schema.STRING) .topic(
"my-topic") .enableBatch(false) .create(); // "key-1"的消息有3條 // "key-2"的消息有3條 //
"key-3"的消息有2條// "key-4"的消息有2條
producer.newMessage().key("key-1").value("message-1-1").send();
producer.newMessage().key("key-1").value("message-1-2").send();
producer.newMessage().key("key-1").value("message-1-3").send();
producer.newMessage().key("key-2").value("message-2-1").send();
producer.newMessage().key("key-2").value("message-2-2").send();
producer.newMessage().key("key-2").value("message-2-3").send();
producer.newMessage().key("key-3").value("message-3-1").send();
producer.newMessage().key("key-3").value("message-3-2").send();
producer.newMessage().key("key-4").value("message-4-1").send();
producer.newMessage().key("key-4").value("message-4-2").send();
?
Exclusive(獨(dú)占模式):
創(chuàng)建一個(gè)消費(fèi)者,以Exclusive模式訂閱消息,代碼如下:
Consumer consumer = client.newConsumer() .topic("my-topic") .subscriptionName(
"my-subscription") .subscriptionType(SubscriptionType.Exclusive) //獨(dú)占模式
.subscribe()
?
只有第一個(gè)消費(fèi)者可以訂閱,其他消費(fèi)者訂閱會(huì)報(bào)錯(cuò)。這就意味著第一個(gè)消費(fèi)者可以收到所有的10條消息,消息消費(fèi)的順序和生產(chǎn)的順序是一樣的。
Failover(災(zāi)備):
創(chuàng)建一個(gè)消費(fèi)者,以Exclusive模式訂閱消息,代碼如下:
//創(chuàng)建消費(fèi)者1 Consumer consumer1 = client.newConsumer() .topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)//災(zāi)備模式 .subscribe() //創(chuàng)建消費(fèi)者2
Consumer consumer2 = client.newConsumer() .topic("my-topic") .subscriptionName(
"my-subscription") .subscriptionType(SubscriptionType.Failover) //災(zāi)備模式
.subscribe()
?
conumser1是起作用的消費(fèi)者, consumer2是備用消費(fèi)者。假設(shè)consumer1收到的5條消息后突然崩了,
那么consumer2接替,成了起作用的消費(fèi)者。
當(dāng)然多個(gè)消費(fèi)者都可以訂閱,但是只有第一個(gè)是可用,第一個(gè)消費(fèi)者斷開連接后,下一個(gè)備用的消費(fèi)者就起作用了。
Shared(共享):
創(chuàng)建一個(gè)消費(fèi)者,以Exclusive模式訂閱消息,代碼如下:
Consumer consumer1 = client.newConsumer() .topic("my-topic") .subscriptionName(
"my-subscription") .subscriptionType(SubscriptionType.Shared) //共享模式
.subscribe() Consumer consumer2= client.newConsumer() .topic("my-topic")
.subscriptionName("my-subscription") .subscriptionType(SubscriptionType.Shared)
.subscribe()//這兩個(gè)消費(fèi)者都是可用的
?
在共享模式,多個(gè)消費(fèi)者都可以訂閱,消息在多個(gè)消費(fèi)者之間是以輪詢的方式分發(fā)。
如果broke同一時(shí)間只發(fā)送一個(gè)消息,那么consume1收到5條消息:
("key-1", "message-1-1") ("key-1", "message-1-3") ("key-2", "message-2-2") (
"key-3", "message-3-1") ("key-4", "message-4-1")
?
消費(fèi)者2收到另外5條消息。
總之,共享模式和其他兩種模式不同,共享模式有更好的靈活性,但是不能保證消息的順序。
Key_share
這是2.4.0版本后新出的訂閱模式,代碼如下:
Consumer consumer1 = client.newConsumer() .topic("my-topic") .subscriptionName(
"my-subscription") .subscriptionType(SubscriptionType.Key_Shared) //key共享模式
.subscribe() Consumer consumer2= client.newConsumer() .topic("my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared) .subscribe()
?
KeyShared和Shared模式類似,區(qū)別在于KeyShared模式下,具有相同key的消息分發(fā)到同一個(gè)消費(fèi)者。
消費(fèi)者1最后收到5條消息:
("key-1", "message-1-1") ("key-1", "message-1-2") ("key-1", "message-1-3") (
"key-3", "message-3-1") ("key-3", "message-3-2")
?
消費(fèi)者2收到另外5條。
如果該模式下消息的key沒有指定,那么所有的消息默認(rèn)分發(fā)到同一消費(fèi)者。
3.2.5 Reader接口
使用 reader 接口, Pulsar客戶可以在topic中“手動(dòng)定位”,從指定的消息開始向前讀取所有消息。Pulsar Java API
可以創(chuàng)建Reader對象,同時(shí)指定一個(gè) topic, 一個(gè)MessageId ,和ReaderConfiguration。
下面是一個(gè)示例:
ReaderConfiguration conf = new ReaderConfiguration(); byte[] msgIdBytes = //
一些消息ID 的字節(jié)數(shù)組 MessageId id = MessageId.fromByteArray(msgIdBytes); Reader reader =
pulsarClient.newReader() .topic(topic) .startMessageId(id) .create();while (
true) { Message message = reader.readNext(); // 處理消息 }
?
在上面的示例中,實(shí)例化一個(gè)Reader對象指定的主題和消息(ID); reader將遍歷主題中msgIdBytes(取值方式取決于應(yīng)用程序) 之后的消息。
上面的示例代碼展示了Reader對象指向特定的消息(ID),但你也可以使用MessageId.earliest來指向topic上最早可用的消息,使用MessageId.latest指向最新的消息。
3.3 Schema
在Pulsar中,所有的消息數(shù)據(jù)都在字節(jié)數(shù)組中,消息schema允許在構(gòu)造和處理消息時(shí)使用其他類型的數(shù)據(jù)(從簡單類型(如String)到更復(fù)雜的類型)。如果在不指定schema的情況下構(gòu)造生產(chǎn)者,則生產(chǎn)者只能生成類型為
byte[]的消息。 下面是一個(gè)示例:
Producer<byte[]> producer = client.newProducer() .topic(topic) .create();
?
以下schema格式目前可用于 Java:
* 無schema 或者字節(jié)數(shù)組schema(使用Schema.BYTES)? Producer<byte[]> bytesProducer =
client.newProducer(Schema.BYTES) .topic("some-raw-bytes-topic") .create();
* String,UTF-8編碼,使用Schema.STRING? Producer<String> stringProducer =
client.newProducer(Schema.STRING) .topic("some-string-topic") .create();
* JSON 模式,創(chuàng)建POJO? Schema<MyPojo> pojoSchema = JSONSchema.of(MyPojo.class);
Producer<MyPojo> pojoProducer = client.newProducer(pojoSchema)
.topic("some-pojo-topic") .create();
?
結(jié)語
Pulsar的特性還有很多,這里重點(diǎn)介紹了Java客戶端的快速上手教程,后面有時(shí)間的話會(huì)繼續(xù)更新Pulsar系列。
參考文檔 http://pulsar.apache.org/docs/en/client-libraries-java/
?
熱門工具 換一換
感谢您访问我们的网站,您可能还对以下资源感兴趣:
调教肉文小说-国产成本人片免费av-空姐av种子无码-在线观看免费午夜视频-综合久久精品激情-国产成人丝袜视频在线观看软件-大芭区三区四区无码-啊啊好爽啊啊插啊用力啊啊-wanch视频网-国产精品成人a免费观看