EMQ介紹
EMQ?(Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平臺(tái)開(kāi)發(fā)的開(kāi)源物聯(lián)網(wǎng) MQTT
消息服務(wù)器。Erlang/OTP 是出色的軟實(shí)時(shí)(Soft-Realtime)、低延時(shí)(Low-Latency)、分布式(Distributed)
的語(yǔ)言平臺(tái)。MQTT 是輕量的(Lightweight)、發(fā)布訂閱模式(PubSub) 的物聯(lián)網(wǎng)(IoT)消息協(xié)議。 ? 訂閱(pub)/發(fā)布(sub)模式 ?
消息隊(duì)列中的廣播(fanout)模式 ? ? 輕量化:docker鏡像都才88.4MB ? ?
一些關(guān)于常用EMQTT的快速鏈接:
官網(wǎng)API地址:http://www.emqtt.com/docs/v2/
<https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2F>
開(kāi)源項(xiàng)目地址:https://github.com/emqtt
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Femqtt>
Docker安裝模式:http://www.emqtt.com/docs/v2/install.html#docker
<https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2Finstall.html%23docker>
MQTT介紹和場(chǎng)景:https://www.mqtt.com/
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.mqtt.com%2F>
?
安裝過(guò)程
假如你的centos上已經(jīng)安裝了Docker,并pull了devicexx/emqttd這個(gè)鏡像,輸入如下命令 docker run -dit
--name=sample_emqtt --restart=always -p18083:18083 -p 1883:1883 -p 8083:8083 -p
8883:8883 998429a869e8
?
? 確保映射如下幾個(gè)端口 1883、8083、8883:這三個(gè)是基于EMQTT傳輸通訊的端口 18083:這個(gè)是EMQTT?Web控制臺(tái)的端口 ?
我的虛擬機(jī)IP網(wǎng)段是153.132,直接輸入http://192.168.153.132:18083進(jìn)入Web控制臺(tái),默認(rèn)用戶名和密碼是admin,
public 至此EMQTT已經(jīng)順利安裝完成。 ?
在NET Core中使用EMQ:
emqtt官網(wǎng)沒(méi)有提供.net的客戶端,在nuget和github上找到了實(shí)現(xiàn)了MQTT協(xié)議的公共組件。 ? MQTTnet nuget:?
https://www.nuget.org/packages/MQTTnet/
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.nuget.org%2Fpackages%2FMQTTnet%2F>
MQTTnet github:?https://github.com/chkr1011/MQTTnet
<https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Fchkr1011%2FMQTTnet>
?
MQTTnet Client的幾個(gè)主要事件:
ApplicationMessageReceived:MQTTnet中主要方法事件,當(dāng)接收到消息的時(shí)候,該事件觸發(fā)。包含Topic,Payload,Qos,Retain主要成員。
Connected:客戶端成功連接時(shí)觸發(fā)。 Disconnected:客戶端丟失連接時(shí)觸發(fā)。 ?
創(chuàng)建一個(gè)發(fā)布者
先創(chuàng)建一個(gè)發(fā)布者Publisher的配置類,代碼簡(jiǎn)單: public static class PublisherConfig { ///
<summary> /// 創(chuàng)建mqtt的客戶端接口實(shí)例 /// </summary> public static readonly IMqttClient
Client = new MqttFactory().CreateMqttClient(); /// <summary> /// 創(chuàng)建mqtt配置選項(xiàng) ///
</summary> public static readonly MqttClientOptions ClientOptions = new
MqttClientOptions {// 通道選項(xiàng) ChannelOptions = new MqttClientTcpOptions { Server =
"192.168.153.132" }, /* * 客戶端ID *
在MQTTnet框架中,當(dāng)ClientId未賦值,將使用默認(rèn)的GUID生成默認(rèn)的ClientId*/ ClientId = "Client_publisher"
,// 客戶端認(rèn)證 Credentials = new MqttClientCredentials { Username = "clientUser_01",
Password= "123123" } }; public const string Topic =
MQTT_Common.Config.Topic.Name; }
?
?為了測(cè)試方便,創(chuàng)建了一個(gè)全局固定的Topic名稱
public static class Topic { public const string Name = "/WorldTopic"; }
?
再創(chuàng)建一個(gè)發(fā)布者Publisher,代碼簡(jiǎn)單
public static class Publisher { private static readonly IMqttClient Client =
PublisherConfig.Client; private static readonly MqttClientOptions ClientOptions=
PublisherConfig.ClientOptions; private static readonlystring Topic =
PublisherConfig.Topic; public static void RunAsync() { try { Console.WriteLine("
publisher is running"); CreateConnection(); LoopInput().Wait(); } catch
(Exception exception) { Console.WriteLine(exception); } } private static void
CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } private static
async Task LoopInput() {while (true) { await Client.SubscribeAsync("/World");
Console.WriteLine("輸入消息數(shù)據(jù):"); var r = Console.ReadLine(); var applicationMessage
= new MqttApplicationMessageBuilder() .WithTopic(Topic) // 設(shè)置主題 .WithPayload(r)
// 設(shè)置載荷(消息內(nèi)容) .WithAtLeastOnceQoS() // 設(shè)置質(zhì)量 .WithRetainFlag(false) // 設(shè)置持久化
.Build(); await Client.PublishAsync(applicationMessage); } }
?
?創(chuàng)建一個(gè)或多個(gè)訂閱者
public static class SubscriberConfig01 { public static readonly IMqttClient
Client= new MqttFactory().CreateMqttClient(); public static readonly
MqttClientOptions ClientOptions= new MqttClientOptions { ChannelOptions = new
MqttClientTcpOptions { Server= "192.168.153.132" }, // 唯一需要修改的Client唯一ID
ClientId ="Client_01", Credentials = new MqttClientCredentials { //
用戶可用多個(gè),也可不啟用客戶端驗(yàn)證 Username = "clientUser_02", Password = "123123" } }; public
conststring Topic = MQTT_Common.Config.Topic.Name; }
?
MQTTnet中所有連接端都是Client,所以唯一的變化是ClientID這個(gè)值,可使用系統(tǒng)GUID自動(dòng)生成,也可以使用不同的Client限定名。
public static class Subscriber01 { private static readonly IMqttClient Client =
SubscriberConfig01.Client;private static readonly MqttClientOptions
ClientOptions = SubscriberConfig01.ClientOptions; private static readonly string
Topic = SubscriberConfig01.Topic; public static async Task RunAsync() { try {
Console.WriteLine("subscriber 1 is running"); CreateConnection(); //
注冊(cè)/訂閱Topic主題 await Client.SubscribeAsync(Topic); Console.ReadKey(); } catch
(Exception exception) { Console.WriteLine(exception); } }private static void
CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } }
?
公共函數(shù)CommonEventHandler,用于公共事件處理和連接嘗試
public static class CommonEventHandler { /// <summary> /// 添加Client相關(guān)事件處理函數(shù) ///
</summary> /// <param name="client">IMqttClient客戶端</param> /// <param
name="clientOptions">MqttClientOptions配置選項(xiàng)</param> /// <param name="subscribe">
主題Topic名稱</param> public static void EventHandler(IMqttClient client,
MqttClientOptions clientOptions,string subscribe = "#") {
client.ApplicationMessageReceived+= (s, e) => { Console.WriteLine("### 收到程序消息
###"); Console.WriteLine($"+ [主題]Topic = {e.ApplicationMessage.Topic}");
Console.WriteLine($"+ [載荷]Payload =
{Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"
+ [質(zhì)量]QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"
+ [持久]Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); };
client.Connected+= async (s, e) => { Console.WriteLine("### 成功連接MQTT ###"); if
(!subscribe.Equals("#")) return; await client.SubscribeAsync(new
TopicFilterBuilder().WithTopic("#").Build()); Console.WriteLine($"###
成功訂閱主題[{subscribe}] ###"); }; client.Disconnected += async (s, e) => {
Console.WriteLine("### 連接丟失 ###"); await Task.Delay(TimeSpan.FromSeconds(5));
try { await client.ConnectAsync(clientOptions); } catch { Console.WriteLine("
### 連接錯(cuò)誤 ###"); } }; } /// <summary> /// 嘗試Client連接到EMQTT /// </summary> ///
<param name="client">IMqttClient客戶端</param> /// <param name="clientOptions">
MqttClientOptions配置選項(xiàng)</param> public static void TryConnectionAsync(IMqttClient
client, MqttClientOptions clientOptions) {try {
client.ConnectAsync(clientOptions).Wait(); }catch (Exception exception) {
Console.WriteLine("### 連接錯(cuò)誤 ###" + Environment.NewLine + exception); } } }
?
配置三個(gè)客戶端,一個(gè)作為發(fā)布者,兩個(gè)作為訂閱者,通過(guò)發(fā)布者輸入任意數(shù)據(jù),運(yùn)行結(jié)果如下:
其中發(fā)布者客戶端會(huì)收到一個(gè)Retain=True的消息,是因?yàn)橹坝邪l(fā)布過(guò)一條持久化的消息,該消息已經(jīng)存儲(chǔ)在EMQTT中,每次啟動(dòng)均會(huì)主動(dòng)向訂閱過(guò)"/World"的訂閱者再次推送該條持久化的消息。
根據(jù)生產(chǎn)消費(fèi)的機(jī)制,此處消息理當(dāng)已經(jīng)被消費(fèi)和清除,但不清楚為何EMQ仍然存在。 ?
總結(jié)
本篇簡(jiǎn)單介紹使用EMQTT的PubSub模式,并使用MQTTnet客戶端連接并訂閱和發(fā)布消息內(nèi)容。 ? ?
熱門(mén)工具 換一換
