<ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>


      回顧上文 <https://www.cnblogs.com/JulianHuang/p/10919346.html>

        作為單體程序,依賴的第三方服務(wù)雖不多,但是2C的程序還是有不少內(nèi)容可講; 作為一個常規(guī)互聯(lián)網(wǎng)系統(tǒng),無外乎就是接受請求、處理請求,輸出響應(yīng)。

      由于業(yè)務(wù)漸漸增長,單機多核的共享內(nèi)存模式帶來的問題很多,編程也困難,隨著多核時代和分布式系統(tǒng)的到來,共享模型已經(jīng)不太適合并發(fā)編程,因此Actor模型
      又重新受到了人們的重視。



      -----調(diào)試多線程都懂------

      * 傳統(tǒng)的編程模型通常使用回調(diào)和同步對象(如鎖)來協(xié)調(diào)任務(wù)和訪問共享數(shù)據(jù),??從宏觀看傳統(tǒng)模型: 任務(wù)是一步步緊接著完成的,資源是需要搶占的。

      * Actor模式是一種并發(fā)模型,與另一種模型共享內(nèi)存完全相反,Actor模型share nothing。所有的線程(或進程)通過消息傳遞
      的方式進行合作,這些線程(或進程)稱為Actor,預(yù)先定義了任務(wù)的流水線后,不關(guān)注數(shù)據(jù)什么時候流到這個任務(wù)?,專注完成工序任務(wù)。

      https://www.cnblogs.com/csguo/p/7521322.html

      ? .Net TPL? Dataflow組件幫助我們快速實現(xiàn)Actor模型。



      ?

      TPL Dataflow是微軟前幾年給出的數(shù)據(jù)處理庫, 內(nèi)置常見的處理塊,可將這些塊組裝成一個處理管道,"塊"對應(yīng)處理管道中的"階段",
      可類比AspNetCore 中Middleware 和pipeline.。

      *
      TPL Dataflow庫為消息傳遞和并行化CPU密集型和I /
      O密集型應(yīng)用程序提供了編程基礎(chǔ),這些應(yīng)用程序具有高吞吐量和低延遲。它還可以讓您明確控制數(shù)據(jù)的緩沖方式并在系統(tǒng)中移動。

      * 為了更好地理解數(shù)據(jù)流編程模型,請考慮從磁盤異步加載圖像并創(chuàng)建這些圖像的應(yīng)用程序。
      *
      ? 傳統(tǒng)的編程模型通常使用回調(diào)和同步對象(如鎖)來協(xié)調(diào)任務(wù)和訪問共享數(shù)據(jù),

      *
      ? 通過使用數(shù)據(jù)流編程模型,您可以創(chuàng)建在從磁盤讀取圖像時處理圖像的數(shù)據(jù)流對象。在數(shù)據(jù)流模型下,您可以聲明數(shù)據(jù)在可用時的處理方式以及數(shù)據(jù)之間的依賴關(guān)系。?由于
      運行時
      管理數(shù)據(jù)之間的依賴關(guān)系,因此通??梢员苊馔皆L問共享數(shù)據(jù)的要求。此外,由于運行時調(diào)度基于數(shù)據(jù)的異步到達而工作,因此數(shù)據(jù)流可以通過有效地管理底層線程來提高響應(yīng)性和吞吐量?! ?br>
      * ? ?需要注意的是:TPL Dataflow 非分布式數(shù)據(jù)流,消息在進程內(nèi)傳遞,?
      ?使用nuget引用?System.Threading.Tasks.Dataflow 包。
      TPL Dataflow 核心概念

      ?1.? Buffer & Block

      TPL Dataflow 內(nèi)置的Block覆蓋了常見的應(yīng)用場景,當然如果內(nèi)置塊不能滿足你的要求,你也可以自定“塊”。

      Block可以劃分為下面3類:

      *
      Buffering Only? ? 【Buffer不是緩存Cache的概念, 而是一個緩沖區(qū)的概念】

      *
      Execution

      *
      Grouping?

      使用以上塊混搭處理管道, 大多數(shù)的塊都會執(zhí)行一個操作,有些時候需要將消息分發(fā)到不同Block,這時可使用特殊類型的緩沖塊給管道“”分叉”。

      2. Execution Block
        可執(zhí)行的塊有兩個核心組件:
      *
      輸入、輸出消息的緩沖區(qū)(一般稱為Input,Output隊列)

      *
      在消息上執(zhí)行動作的委托




        消息在輸入和輸出時能夠被緩沖:當Func委托的運行速度比輸入的消息速度慢時,后續(xù)消息將在到達時進行緩沖;當下一個塊的輸入緩沖區(qū)中沒有容量時,將在輸出時緩沖。

      每個塊我們可以配置:

      *
      緩沖區(qū)的總?cè)萘浚?默認無上限

      *
      執(zhí)行操作委托的并發(fā)度, 默認情況下塊按照順序處理消息,一次一個。

      我們將塊鏈接在一起形成一個處理管道,生產(chǎn)者將消息推向管道。

      TPL Dataflow有一個基于pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連接和推送機制。

      *
      TransformBlock(Execution category)-- 由輸入輸出緩沖區(qū)和一個Func<TInput,
      TOutput>委托組成,消費的每個消息,都會輸出另外一個,你可以使用這個Block去執(zhí)行輸入消息的轉(zhuǎn)換,或者轉(zhuǎn)發(fā)輸出的消息到另外一個Block。

      *
      TransformManyBlock (Execution category) -- 由輸入輸出緩沖區(qū)和一個Func<TInput,
      IEnumerable<TOutput>>委托組成, 它為輸入的每個消息輸出一個 IEnumerable<TOutput>

      *
      BroadcastBlock (Buffering category)-- 由只容納1個消息的緩沖區(qū)和Func<T,
      T>委托組成。緩沖區(qū)被每個新傳入的消息所覆蓋,委托僅僅為了讓你控制怎樣克隆這個消息,不做消息轉(zhuǎn)換。

                  該塊可以鏈接到多個塊(管道的分叉),雖然它一次只緩沖一條消息,但它一定會在該消息被覆蓋之前將該消息轉(zhuǎn)發(fā)到鏈接塊(鏈接塊還有緩沖區(qū))。

      *
      ActionBlock (Execution category)--
      由緩沖區(qū)和Action<T>委托組成,他們一般是管道的結(jié)尾,他們不再給其他塊轉(zhuǎn)發(fā)消息,他們只會處理輸入的消息。

      *
      BatchBlock (Grouping category)--
      告訴它你想要的每個批處理的大小,它將累積消息,直到它達到那個大小,然后將它作為一組消息轉(zhuǎn)發(fā)到下一個塊。


        還有一下其他的Block類型:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時不會深入。

      3. Pipeline Chain React


        當輸入緩沖區(qū)達到上限容量,為其供貨的上游塊的輸出緩沖區(qū)將開始填充,當輸出緩沖區(qū)已滿時,該塊必須暫停處理,直到緩沖區(qū)有空間,這意味著一個Block的處理瓶頸可能導(dǎo)致所有前面的塊的緩沖區(qū)被填滿。

        但是不是所有的塊變滿時,都會暫停,BroadcastBlock 有允許1個消息的緩沖區(qū),每個消息都會被覆蓋,
      因此如果這個廣播塊不能將消息轉(zhuǎn)發(fā)到下游,則在下個消息到達的時候消息將丟失,這在某種意義上是一種限流(比較生硬).

      編程實踐



      ?

      ① 生產(chǎn)者投遞消息

      ? ?可使用Post或者SendAsync 方法向首塊投遞消息

      *
      Post方法即時返回true/false, True意味著消息被block接收(緩沖區(qū)有空余),
      false意味著拒絕了消息(緩沖區(qū)已滿或者Block已經(jīng)出錯了)。

      *
      SendAsync方法返回一個Task<bool>, 將會以異步的方式阻塞直到塊接收、拒絕、塊出錯。
      Post、SendAsync的不同點在于SendAsync可以延遲投遞(下一管道的輸入buffer不空,可稍后投遞消息)。 ? ② 定義流水線 public
      EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache,
      IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory) {
      _httpClient= httpClientFactory.CreateClient("bce-request"); _redisDB0 =
      redisCache[0]; _redisDB = redisCache; _logger =
      loggerFactory.CreateLogger(nameof(EqidPairHandler));var option = new
      DataflowLinkOptions { PropagateCompletion =true }; publisher =
      _redisDB.RedisConnection.GetSubscriber();_eqid2ModelTransformBlock = new
      TransformBlock<EqidPair, EqidModel> ( // redis piublih 沒有做在TransformBlock
      fun里面, 因為publih失敗可能影響后續(xù)的block傳遞 eqidPair => EqidResolverAsync(eqidPair), new
      ExecutionDataflowBlockOptions { MaxDegreeOfParallelism= con.GetValue<int>("
      MaxDegreeOfParallelism") } ); //
      https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline
      _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);
      _logPublishBlock= new ActionBlock<EqidModel>(x => PublishAsync(x) );
      _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個消息的緩存區(qū)和拷貝函數(shù)組成
      _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
      _broadcastBlock.LinkTo(_logPublishBlock, option);
      _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option); } public class
      LogBatchBlock<T> : ILogDestination<T>where T : IModelBase { private readonly
      string _dirPath; private readonly Timer _triggerBatchTimer; private readonly
      Timer _openFileTimer;private DateTime? _nextCheckpoint; private TextWriter
      _currentWriter;private readonly LogHead _logHead; private readonly object
      _syncRoot =new object(); private readonly ILogger _logger; private readonly
      BatchBlock<T> _packer; private readonly ActionBlock<T[]> batchWriterBlock;
      private readonly TimeSpan _logFileIntervalTimeSpan; /// <summary> /// Generate
      request log file./// </summary> public LogBatchBlock(LogConfig logConfig,
      ILoggerFactory loggerFactory) { _logger=
      loggerFactory.CreateLogger<LogBatchBlock<T>>(); _dirPath = logConfig.DirPath; if
      (!Directory.Exists(_dirPath)) { Directory.CreateDirectory(_dirPath); } _logHead
      = logConfig.LogHead; _packer = new BatchBlock<T>(logConfig.BatchSize);
      batchWriterBlock= new ActionBlock<T[]>(models => WriteToFile(models)); //
      形成pipeline必須放在LinkTo前面 _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions
      { PropagateCompletion =true }); // 防止BatchPacker一直不滿足10條數(shù)據(jù),無法打包,故設(shè)定間隔15s強制寫入
      _triggerBatchTimer =new Timer(state => { _packer.TriggerBatch(); }, null,
      TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));//
      實時寫文件流能確保隨時生成文件,但存在極端情況:某小時沒有需要寫入的數(shù)據(jù),導(dǎo)致該小時不會創(chuàng)建文件,以下定時任務(wù)確保創(chuàng)建文件
      _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);
      _openFileTimer= new Timer(state => { AlignCurrentFileTo(DateTime.Now); }, null,
      TimeSpan.Zero, _logFileIntervalTimeSpan); }public ITargetBlock<T> InputBlock =>
      _packer;private void AlignCurrentFileTo(DateTime dt) { if (!
      _nextCheckpoint.HasValue) { OpenFile(dt); }if (dt >= _nextCheckpoint.Value) {
      CloseFile(); OpenFile(dt); } }private void OpenFile(DateTime now, string
      fileSuffix =null) { string filePath = null; try { var currentHour =
      now.Date.AddHours(now.Hour); _nextCheckpoint=
      currentHour.Add(_logFileIntervalTimeSpan);int hourConfiguration =
      _logFileIntervalTimeSpan.Hours;int minuteConfiguration =
      _logFileIntervalTimeSpan.Minutes; filePath= $"{_dirPath}/u_ex{now.ToString("
      yyMMddHH")}{fileSuffix}.log"; var appendHead = !File.Exists(filePath); if
      (filePath !=null) { var stream = new FileStream(filePath, FileMode.Append,
      FileAccess.Write);var sw = new StreamWriter(stream, Encoding.Default); if
      (appendHead) { sw.Write(GenerateHead()); } _currentWriter= sw;
      _logger.LogDebug($"{DateTime.Now} TextWriter has been created."); } } catch
      (Exception e) {if (fileSuffix == null) { _logger.LogWarning($"OpenFile
      failed:{e.StackTrace.ToString()}:{e.Message}." ); OpenFile(now, $"
      -{Guid.NewGuid()}"); } else { _logger.LogError($"OpenFile failed after retry:
      {filePath}", e); } } } private void CloseFile() { if (_currentWriter != null) {
      _currentWriter.Flush(); _currentWriter.Dispose(); _currentWriter= null;
      _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed."); }
      _nextCheckpoint= null; } private string GenerateHead() { StringBuilder head =
      new StringBuilder(); head.AppendLine("#Software: " + _logHead.Software)
      .AppendLine("#Version: " + _logHead.Version) .AppendLine($"#Date:
      {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}") .AppendLine("#Fields: " +
      _logHead.Fields);return head.ToString(); } private void WriteToFile(T[] models)
      {try { lock (_syncRoot) { var flag = false; foreach (var model in models) { if
      (model ==null) continue; flag = true;
      AlignCurrentFileTo(model.ServerLocalTime);
      _currentWriter.WriteLine(model.ToString()); }if (flag) _currentWriter.Flush();
      } }catch (Exception ex) { _logger.LogError("WriteToFile Error : {0}",
      ex.Message); } }public bool AcceptLogModel(T model) { return
      _packer.Post(model); }public string GetDirPath() { return _dirPath; } public
      async Task CompleteAsync() { _triggerBatchTimer.Dispose();
      _openFileTimer.Dispose(); _packer.TriggerBatch(); _packer.Complete();await
      InputBlock.Completion;lock (_syncRoot) { CloseFile(); } } } 仿IIS日志寫入組件
      ?

      ?注意事項 :異常處理

        上述程序在部署時就遇到相關(guān)的坑位,在測試環(huán)境_eqid2ModelTransformBlock?內(nèi)Func委托穩(wěn)定執(zhí)行,程序并未出現(xiàn)異樣;

        部署到生產(chǎn)之后, 該Pipeline運行一段時間就停止工作,一直很困惑, 后來通過監(jiān)測
      _eqid2ModelTransformBlock.Completion 屬性,發(fā)現(xiàn)該塊在執(zhí)行某次Func委托時報錯,提前進入完成態(tài)
      <https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.fault?redirectedfrom=MSDN&view=netcore-2.2#System_Threading_Tasks_Dataflow_IDataflowBlock_Fault_System_Exception_>

      ? ? ? ?官方資料
      <https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.fault?redirectedfrom=MSDN&view=netcore-2.2#System_Threading_Tasks_Dataflow_IDataflowBlock_Fault_System_Exception_>
      表明: 某塊進入Fault、Cancel狀態(tài),都會導(dǎo)致該塊提前進入“完成態(tài)”,但因Fault、Cancle進入的“完成態(tài)”會導(dǎo)致
      輸入buffer和輸出buffer 被清空。

      ? ? ? ??  After Fault has been called on a dataflow block, that block will
      complete, and its Completion task will enter a final state. Faulting a block,
      as with canceling a block, causes buffered messages (unprocessed input messages
      as well as unoffered output messages) to be lost.

      ?

      當TPL Dataflow不再處理消息并且能保證不再處理消息的時候,就被定義為
      "完成態(tài)",?IDataflow.Completion屬性(Task對象)標記了該狀態(tài),
      ?Task對象的TaskStatus枚舉值描述了此Block進入完成態(tài)的真實
      <https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.dataflow.idataflowblock.completion?view=netcore-2.2>
      原因


      - TaskStatus.RanToCompletion   ? ?"成功完成" 在Block中定義的任務(wù)??

      - TaskStatus.Fault? ? ? ? ? ? ? ? ? ?     因未處理的異常? 導(dǎo)致"過早的完成"

      - TaskStatus.Cancled? ? ? ? ? ? ? ?    因取消操作??導(dǎo)致 "過早的完成"

        故需要小心處理異常, 一般情況下我們使用try、catch包含所有的執(zhí)行代碼以確保所有的異常都被處理。

      ?

      ??? 本文作為TPL
      Dataflow的入門指南,微軟技術(shù)棧的同事可持續(xù)關(guān)注這個基于Actor模型的流水線處理組件,處理單體程序中高并發(fā),低延遲場景相當巴適。

      ?
      作者:JulianHuang <https://www.cnblogs.com/JulianHuang/>
      碼甲拙見,如有問題請下方留言大膽斧正;碼字+Visio制圖,均為原創(chuàng),看官請不吝好評+關(guān)注,??~。。~

      本文歡迎轉(zhuǎn)載,請轉(zhuǎn)載頁面明顯位置注明原作者及原文鏈接。
      ?

      友情鏈接
      ioDraw流程圖
      API參考文檔
      OK工具箱
      云服務(wù)器優(yōu)惠
      阿里云優(yōu)惠券
      騰訊云優(yōu)惠券
      京東云優(yōu)惠券
      站點信息
      問題反饋
      郵箱:[email protected]
      QQ群:637538335
      關(guān)注微信

        <ul id="qxxfc"><fieldset id="qxxfc"><tr id="qxxfc"></tr></fieldset></ul>
          成年女人十八免费看 | 国产麻豆视频 | 久久六视频 | 黄色外国网站 | 18禁成人福利网站视频播放 | 精品传媒一区二区 | 国产激情视频在线观看 | 国产福利视频一区二区 | 日韩欧美国产综合 | 夜夜躁狠狠躁日日 |