<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      引言

        熟悉TPL Dataflow <https://www.cnblogs.com/JulianHuang/p/11177766.html>
      博文的朋友可能記得這是個(gè)單體程序,使用TPL Dataflow 處理工作流任務(wù), 在使用Docker部署的過程中, 有一個(gè)問題一直無法回避:

      ? ? ? ?在單體程序部署的瞬間(服務(wù)不可用)會(huì)有少量流量無法處理;更糟糕的情況下,迭代部署的這個(gè)版本有問題,上線后無法運(yùn)作, 更多的流量沒有得到處理。

      ? ? ? 背負(fù)神圣使命(巨大壓力)的程序猿心生一計(jì), 為何不將單體程序改成分布式:服務(wù)A只接受數(shù)據(jù),服務(wù)B只處理數(shù)據(jù)。

      ?

      知識(shí)儲(chǔ)備

      ? ? 消息隊(duì)列和訂閱發(fā)布作為老生常談的兩個(gè)知識(shí)點(diǎn)被反復(fù)提及,按照J(rèn)MS的規(guī)范, 官方稱為點(diǎn)對(duì)點(diǎn)(point to point, queue) 和
      訂閱發(fā)布(publish/subscribe,topic ),

      點(diǎn)對(duì)點(diǎn)

        消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消費(fèi)者從queue中取出并且消費(fèi)消息。

      注意:

      消息被消費(fèi)以后,queue中不再有存儲(chǔ),所以消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。

      Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。

      當(dāng)沒有消費(fèi)者可用時(shí),這個(gè)消息會(huì)被保存直到有 一個(gè)可用的消費(fèi)者。

      發(fā)布/訂閱

        消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。

      注意:

      發(fā)布者將消息發(fā)布到通道中,而不用知道訂閱者是誰(不關(guān)注是否存在訂閱者);訂閱者可收聽自己感興趣的多個(gè)通道, 也不需要知道發(fā)布者是誰(不關(guān)注是哪個(gè)發(fā)布者)。

      故如果沒有消費(fèi)者,發(fā)布的消息將得不到處理;



      頭腦風(fēng)暴?

      本次采用的消息隊(duì)列模型:

      * ? ?解耦業(yè)務(wù):? 新建Receiver程序作為生產(chǎn)者,專注于接收并發(fā)送到隊(duì)列;原有的webapp作為消費(fèi)者專注數(shù)據(jù)處理。
      * ? ?起到削峰填谷的作用, 若建立多個(gè)消費(fèi)者webapp容器,還能形成負(fù)載均衡的效果。?
      Redis 原生支持發(fā)布/訂閱 <https://redis.io/commands/publish>模型,內(nèi)置的List數(shù)據(jù)結(jié)構(gòu)
      <https://redis.io/commands/lpush>亦能形成輕量級(jí)MQ的效果。

      ? ? 需要關(guān)注Redis 兩個(gè)命令( 左進(jìn)右出,右進(jìn)左出同理):

      ? ? LPUSH? &? RPOP/BRPOP <https://redis.io/commands/brpop>

      Brpop 中的B 表示 “Block”,
      是一個(gè)rpop命令的阻塞版本:若指定List沒有新元素,在給定時(shí)間內(nèi),該命令會(huì)阻塞當(dāng)前redis客戶端連接,直到超時(shí)返回nil

      編程實(shí)踐

      本次使用 ASPNetCore 完成RedisMQ的實(shí)踐,引入Redis國(guó)產(chǎn)第三方開源庫(kù)CSRedisCore.

      不使用著名的StackExchange.Redis 組件庫(kù)的原因:

      *
      之前一直使用StackExchange.Redis, 參考了很多資料,做了很多優(yōu)化,并未完全解決RedisTimeoutException問題?

      *
      StackExchange.Redis基于其多路復(fù)用的連接機(jī)制,不支持阻塞式命令
      <https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers>,
      故采用了 CSRedisCore,該庫(kù)強(qiáng)調(diào)了API 與Redis官方命令一致,很容易上手

      生產(chǎn)者Receiver

      ?生產(chǎn)者使用LPush?命令向Redis List數(shù)據(jù)結(jié)構(gòu)寫入消息。
      ------------------截取自Startup.cs-------------------------
      public void ConfigureServices(IServiceCollection services)
      {
        // Redis客戶端要定義成單例, 不然在大流量并發(fā)收數(shù)的時(shí)候, 會(huì)造成redis client來不及釋放。另一方面也確認(rèn)api控制器不是單例模式,
        var csredis = new
      CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");
        RedisHelper.Initialization(csredis);
        services.AddSingleton(csredis);
      services.AddMvc();
      }
      ------------------截取自數(shù)據(jù)接收Controller------------------- [Route("batch")]
      [HttpPost]public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair>
      eqidPairs) {   if (!ModelState.IsValid)   throw new ArgumentException("Http
      Body Payload Error.");   var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";
      eqidPairs= await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs !=
      null && eqidPairs.Any())     RedisHelper.LPush(redisKey, eqidPairs.ToArray());
          await Task.CompletedTask; }
      ?消費(fèi)者webapp

      ? ? ?根據(jù)以上RedisMQ思路,事件消費(fèi)方式是拉取pull,故需要輪詢Redis? List數(shù)據(jù)結(jié)構(gòu),這里使用ASPNetCore內(nèi)置的
      BackgroundService后臺(tái)服務(wù)類實(shí)現(xiàn)后臺(tái)輪詢消費(fèi)任務(wù)。
      public class BackgroundJob : BackgroundService { private readonly
      IEqidPairHandler _eqidPairHandler;private readonly CSRedisClient[]
      _cSRedisClients;private readonly IConfiguration _conf; private readonly ILogger
      _logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[]
      csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory) {
      _eqidPairHandler= eqidPairHandler; _cSRedisClients = csRedisClients; _conf =
      conf; _logger= loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected
      override async Task ExecuteAsync(CancellationToken stoppingToken) {
      _logger.LogInformation("Service starting"); if (_cSRedisClients[0] == null) {
      _cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + "
      ,defaultDatabase=" + 0); } RedisHelper.Initialization(_cSRedisClients[0]); while
      (!stoppingToken.IsCancellationRequested) { var key = $"
      eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(
      5, key); if (eqidpair != null) await
      _eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>
      (eqidpair));// 強(qiáng)烈建議無論如何休眠一段時(shí)間,防止突發(fā)大流量導(dǎo)致webApp進(jìn)程CPU滿載,自行根據(jù)場(chǎng)景設(shè)置合理休眠時(shí)間 await
      Task.Delay(10, stoppingToken); } _logger.LogInformation("Service stopping"); } }
      ?

      最后依照引言中的部署原理圖,將Nginx,Receiver, WebApp使用docker-compose工具容器化

      根據(jù)docker-compsoe up <https://docs.docker.com/compose/reference/up/>
      命令的用法,若容器正在運(yùn)行且對(duì)應(yīng)的Service Configuration或Image并未改變,該容器不會(huì)被ReCreate;

      docker-compose? up? 命令默認(rèn)只會(huì)停止Service或Image變更的容器并重建。

      If there are existing containers for a service, and the service’s
      configuration or image was changed after the container’s creation,?
      docker-compose up?picks up the changes by stopping and recreating the
      containers (preserving mounted volumes). To prevent Compose from picking up
      changes, use the?--no-recreate?flag.

      做一次上線測(cè)試驗(yàn)證,修改docker-compose.yml文件Web?app的容器服務(wù),docker-compose up;

      僅數(shù)據(jù)處理程序WebApp容器被重建:



       Nice,分布式改造上線,效果很明顯,現(xiàn)在可以放心安全的迭代Web App數(shù)據(jù)處理程序。
      作者:JulianHuang <https://www.cnblogs.com/JulianHuang/>
      碼甲拙見,如有問題請(qǐng)下方留言大膽斧正;碼字+Visio制圖,均為原創(chuàng),看官請(qǐng)不吝好評(píng)+關(guān)注,? ~。。~

      本文歡迎轉(zhuǎn)載,請(qǐng)轉(zhuǎn)載頁(yè)面明顯位置注明原作者及原文鏈接。

      ?

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點(diǎn)信息
      問題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          成人无码做爰毛片国产 | 就要操山东老熟女的逼逼 | 在线观看黄国产 | 东京热视频在线观看 | 欧洲尺码日本尺码专线美国欧洲lv | 麻豆免费视频在线观看 | 欧美黄网站 | 女人扒开屁股爽桶30分钟的演员 | 乌克兰美女毛片 | 美女被吸乳歪歪漫画中国视频 |