<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      EMQ介紹
        EMQ?(Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 平臺(tái)開(kāi)發(fā)的開(kāi)源物聯(lián)網(wǎng) MQTT
      消息服務(wù)器。Erlang/OTP 是出色的軟實(shí)時(shí)(Soft-Realtime)、低延時(shí)(Low-Latency)、分布式(Distributed)
      的語(yǔ)言平臺(tái)。MQTT 是輕量的(Lightweight)、發(fā)布訂閱模式(PubSub) 的物聯(lián)網(wǎng)(IoT)消息協(xié)議。 ? 訂閱(pub)/發(fā)布(sub)模式 ?
      消息隊(duì)列中的廣播(fanout)模式 ? ? 輕量化:docker鏡像都才88.4MB ? ?
      一些關(guān)于常用EMQTT的快速鏈接:
      官網(wǎng)API地址:http://www.emqtt.com/docs/v2/
      <https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2F>
      開(kāi)源項(xiàng)目地址:https://github.com/emqtt
      <https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Femqtt>
      Docker安裝模式:http://www.emqtt.com/docs/v2/install.html#docker
      <https://yq.aliyun.com/go/articleRenderRedirect?url=http%3A%2F%2Fwww.emqtt.com%2Fdocs%2Fv2%2Finstall.html%23docker>
      MQTT介紹和場(chǎng)景:https://www.mqtt.com/
      <https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.mqtt.com%2F>
      ?
      安裝過(guò)程
      假如你的centos上已經(jīng)安裝了Docker,并pull了devicexx/emqttd這個(gè)鏡像,輸入如下命令 docker run -dit
      --name=sample_emqtt --restart=always -p18083:18083 -p 1883:1883 -p 8083:8083 -p
      8883:8883 998429a869e8
      ?
      ? 確保映射如下幾個(gè)端口 1883、8083、8883:這三個(gè)是基于EMQTT傳輸通訊的端口 18083:這個(gè)是EMQTT?Web控制臺(tái)的端口 ?
      我的虛擬機(jī)IP網(wǎng)段是153.132,直接輸入http://192.168.153.132:18083進(jìn)入Web控制臺(tái),默認(rèn)用戶名和密碼是admin,
      public 至此EMQTT已經(jīng)順利安裝完成。 ?
      在NET Core中使用EMQ:
      emqtt官網(wǎng)沒(méi)有提供.net的客戶端,在nuget和github上找到了實(shí)現(xiàn)了MQTT協(xié)議的公共組件。 ? MQTTnet nuget:?
      https://www.nuget.org/packages/MQTTnet/
      <https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fwww.nuget.org%2Fpackages%2FMQTTnet%2F>
      MQTTnet github:?https://github.com/chkr1011/MQTTnet
      <https://yq.aliyun.com/go/articleRenderRedirect?url=https%3A%2F%2Fgithub.com%2Fchkr1011%2FMQTTnet>
      ?
      MQTTnet Client的幾個(gè)主要事件:

      ApplicationMessageReceived:MQTTnet中主要方法事件,當(dāng)接收到消息的時(shí)候,該事件觸發(fā)。包含Topic,Payload,Qos,Retain主要成員。
      Connected:客戶端成功連接時(shí)觸發(fā)。 Disconnected:客戶端丟失連接時(shí)觸發(fā)。 ?
      創(chuàng)建一個(gè)發(fā)布者
      先創(chuàng)建一個(gè)發(fā)布者Publisher的配置類,代碼簡(jiǎn)單: public static class PublisherConfig { ///
      <summary> /// 創(chuàng)建mqtt的客戶端接口實(shí)例 /// </summary> public static readonly IMqttClient
      Client = new MqttFactory().CreateMqttClient(); /// <summary> /// 創(chuàng)建mqtt配置選項(xiàng) ///
      </summary> public static readonly MqttClientOptions ClientOptions = new
      MqttClientOptions {// 通道選項(xiàng) ChannelOptions = new MqttClientTcpOptions { Server =
      "192.168.153.132" }, /* * 客戶端ID *
      在MQTTnet框架中,當(dāng)ClientId未賦值,將使用默認(rèn)的GUID生成默認(rèn)的ClientId*/ ClientId = "Client_publisher"
      ,// 客戶端認(rèn)證 Credentials = new MqttClientCredentials { Username = "clientUser_01",
      Password= "123123" } }; public const string Topic =
      MQTT_Common.Config.Topic.Name; }
      ?

      ?為了測(cè)試方便,創(chuàng)建了一個(gè)全局固定的Topic名稱
      public static class Topic { public const string Name = "/WorldTopic"; }
      ?

      再創(chuàng)建一個(gè)發(fā)布者Publisher,代碼簡(jiǎn)單
      public static class Publisher { private static readonly IMqttClient Client =
      PublisherConfig.Client; private static readonly MqttClientOptions ClientOptions=
      PublisherConfig.ClientOptions; private static readonlystring Topic =
      PublisherConfig.Topic; public static void RunAsync() { try { Console.WriteLine("
      publisher is running"); CreateConnection(); LoopInput().Wait(); } catch
      (Exception exception) { Console.WriteLine(exception); } } private static void
      CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
      CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } private static
      async Task LoopInput() {while (true) { await Client.SubscribeAsync("/World");
      Console.WriteLine("輸入消息數(shù)據(jù):"); var r = Console.ReadLine(); var applicationMessage
      = new MqttApplicationMessageBuilder() .WithTopic(Topic) // 設(shè)置主題 .WithPayload(r)
      // 設(shè)置載荷(消息內(nèi)容) .WithAtLeastOnceQoS() // 設(shè)置質(zhì)量 .WithRetainFlag(false) // 設(shè)置持久化
      .Build(); await Client.PublishAsync(applicationMessage); } }
      ?

      ?創(chuàng)建一個(gè)或多個(gè)訂閱者
      public static class SubscriberConfig01 { public static readonly IMqttClient
      Client= new MqttFactory().CreateMqttClient(); public static readonly
      MqttClientOptions ClientOptions= new MqttClientOptions { ChannelOptions = new
      MqttClientTcpOptions { Server= "192.168.153.132" }, // 唯一需要修改的Client唯一ID
      ClientId ="Client_01", Credentials = new MqttClientCredentials { //
      用戶可用多個(gè),也可不啟用客戶端驗(yàn)證 Username = "clientUser_02", Password = "123123" } }; public
      conststring Topic = MQTT_Common.Config.Topic.Name; }
      ?

      MQTTnet中所有連接端都是Client,所以唯一的變化是ClientID這個(gè)值,可使用系統(tǒng)GUID自動(dòng)生成,也可以使用不同的Client限定名。
      public static class Subscriber01 { private static readonly IMqttClient Client =
      SubscriberConfig01.Client;private static readonly MqttClientOptions
      ClientOptions = SubscriberConfig01.ClientOptions; private static readonly string
      Topic = SubscriberConfig01.Topic; public static async Task RunAsync() { try {
      Console.WriteLine("subscriber 1 is running"); CreateConnection(); //
      注冊(cè)/訂閱Topic主題 await Client.SubscribeAsync(Topic); Console.ReadKey(); } catch
      (Exception exception) { Console.WriteLine(exception); } }private static void
      CreateConnection() { CommonEventHandler.EventHandler(Client, ClientOptions);
      CommonEventHandler.TryConnectionAsync(Client, ClientOptions); } }
      ?

      公共函數(shù)CommonEventHandler,用于公共事件處理和連接嘗試
      public static class CommonEventHandler { /// <summary> /// 添加Client相關(guān)事件處理函數(shù) ///
      </summary> /// <param name="client">IMqttClient客戶端</param> /// <param
      name="clientOptions">MqttClientOptions配置選項(xiàng)</param> /// <param name="subscribe">
      主題Topic名稱</param> public static void EventHandler(IMqttClient client,
      MqttClientOptions clientOptions,string subscribe = "#") {
      client.ApplicationMessageReceived+= (s, e) => { Console.WriteLine("### 收到程序消息
      ###"); Console.WriteLine($"+ [主題]Topic = {e.ApplicationMessage.Topic}");
      Console.WriteLine($"+ [載荷]Payload =
      {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}"); Console.WriteLine($"
      + [質(zhì)量]QoS = {e.ApplicationMessage.QualityOfServiceLevel}"); Console.WriteLine($"
      + [持久]Retain = {e.ApplicationMessage.Retain}"); Console.WriteLine(); };
      client.Connected+= async (s, e) => { Console.WriteLine("### 成功連接MQTT ###"); if
      (!subscribe.Equals("#")) return; await client.SubscribeAsync(new
      TopicFilterBuilder().WithTopic("#").Build()); Console.WriteLine($"###
      成功訂閱主題[{subscribe}] ###"); }; client.Disconnected += async (s, e) => {
      Console.WriteLine("### 連接丟失 ###"); await Task.Delay(TimeSpan.FromSeconds(5));
      try { await client.ConnectAsync(clientOptions); } catch { Console.WriteLine("
      ### 連接錯(cuò)誤 ###"); } }; } /// <summary> /// 嘗試Client連接到EMQTT /// </summary> ///
      <param name="client">IMqttClient客戶端</param> /// <param name="clientOptions">
      MqttClientOptions配置選項(xiàng)</param> public static void TryConnectionAsync(IMqttClient
      client, MqttClientOptions clientOptions) {try {
      client.ConnectAsync(clientOptions).Wait(); }catch (Exception exception) {
      Console.WriteLine("### 連接錯(cuò)誤 ###" + Environment.NewLine + exception); } } }
      ?

      配置三個(gè)客戶端,一個(gè)作為發(fā)布者,兩個(gè)作為訂閱者,通過(guò)發(fā)布者輸入任意數(shù)據(jù),運(yùn)行結(jié)果如下:

      其中發(fā)布者客戶端會(huì)收到一個(gè)Retain=True的消息,是因?yàn)橹坝邪l(fā)布過(guò)一條持久化的消息,該消息已經(jīng)存儲(chǔ)在EMQTT中,每次啟動(dòng)均會(huì)主動(dòng)向訂閱過(guò)"/World"的訂閱者再次推送該條持久化的消息。
      根據(jù)生產(chǎn)消費(fèi)的機(jī)制,此處消息理當(dāng)已經(jīng)被消費(fèi)和清除,但不清楚為何EMQ仍然存在。 ?
      總結(jié)
      本篇簡(jiǎn)單介紹使用EMQTT的PubSub模式,并使用MQTTnet客戶端連接并訂閱和發(fā)布消息內(nèi)容。 ? ?

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點(diǎn)信息
      問(wèn)題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          影音先锋激情网 | A片网站在线观看 | 插骚逼网站 | 国产乱伦导航 | 亚洲午夜影视 | www.天堂在线 | 大操逼| 91人妻人人澡人人爽人人精品乱 | 麻豆的视频高清在线观看完整 | 尹人大香蕉网 |