前言

          Microsoft.AspNetCore.ConcurrencyLimiter
          AspNetCore3.0后增加的,用于傳入的請求進行排隊處理,避免線程池的不足.
          我們?nèi)粘i_發(fā)中可能常做的給某web服務(wù)器配置連接數(shù)以及,請求隊列大小,那么今天我們看看如何在通過中間件形式實現(xiàn)一個并發(fā)量以及隊列長度限制.

          Queue策略

          添加Nuget

          Install-Package Microsoft.AspNetCore.ConcurrencyLimiter
          public void ConfigureServices(IServiceCollection services) {
          services.AddQueuePolicy(options => { //最大并發(fā)請求數(shù) options.MaxConcurrentRequests =
          2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
          public void Configure(IApplicationBuilder app, IWebHostEnvironment env) {
          //添加并發(fā)限制中間件 app.UseConcurrencyLimiter(); app.Run(async context => {
          Task.Delay(100).Wait(); // 100ms sync-over-async await
          context.Response.WriteAsync("Hello World!"); }); if (env.IsDevelopment()) {
          app.UseDeveloperExceptionPage(); } app.UseHttpsRedirection(); app.UseRouting();
          app.UseAuthorization(); app.UseEndpoints(endpoints => {
          endpoints.MapControllers(); }); }
          通過上面簡單的配置,我們就可以將他引入到我們的代碼中,從而做并發(fā)量限制,以及隊列的長度;那么問題來了,他是怎么實現(xiàn)的呢?
          public static IServiceCollection AddQueuePolicy(this IServiceCollection
          services, Action<QueuePolicyOptions> configure) {
          services.Configure(configure); services.AddSingleton<IQueuePolicy,
          QueuePolicy>(); return services; }
          QueuePolicy采用的是SemaphoreSlim信號量設(shè)計,SemaphoreSlim
          、Semaphore(信號量)支持并發(fā)多線程進入被保護代碼,對象在初始化時會指定
          最大任務(wù)數(shù)量,當線程請求訪問資源,信號量遞減,而當他們釋放時,信號量計數(shù)又遞增。
          /// <summary> /// 構(gòu)造方法(初始化Queue策略) /// </summary> /// <param
          name="options"></param> public QueuePolicy(IOptions<QueuePolicyOptions>
          options) { _maxConcurrentRequests = options.Value.MaxConcurrentRequests; if
          (_maxConcurrentRequests <= 0) { throw new
          ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must
          be a positive integer."); } _requestQueueLimit =
          options.Value.RequestQueueLimit; if (_requestQueueLimit < 0) { throw new
          ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be
          a negative number."); } //使用SemaphoreSlim來限制任務(wù)最大個數(shù) _serverSemaphore = new
          SemaphoreSlim(_maxConcurrentRequests); }
          ConcurrencyLimiterMiddleware中間件
          /// <summary> /// Invokes the logic of the middleware. /// </summary> ///
          <param name="context">The <see cref="HttpContext"/>.</param> /// <returns>A
          <see cref="Task"/> that completes when the request leaves.</returns> public
          async Task Invoke(HttpContext context) { var waitInQueueTask =
          _queuePolicy.TryEnterAsync(); // Make sure we only ever call GetResult once on
          the TryEnterAsync ValueTask b/c it resets. bool result; if
          (waitInQueueTask.IsCompleted) {
          ConcurrencyLimiterEventSource.Log.QueueSkipped(); result =
          waitInQueueTask.Result; } else { using
          (ConcurrencyLimiterEventSource.Log.QueueTimer()) { result = await
          waitInQueueTask; } } if (result) { try { await _next(context); } finally {
          _queuePolicy.OnExit(); } } else {
          ConcurrencyLimiterEventSource.Log.RequestRejected();
          ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
          context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await
          _onRejected(context); } }
          每次當我們請求的時候首先會調(diào)用_queuePolicy.TryEnterAsync(),進入該方法后先開啟一個私有l(wèi)ock鎖,再接著判斷
          總請求量是否≥(請求隊列限制的大小+最大并發(fā)請求數(shù)),如果當前數(shù)量超出了,那么我直接拋出,送你個503狀態(tài);
          if (result) { try { await _next(context); } finally { _queuePolicy.OnExit();
          } } else { ConcurrencyLimiterEventSource.Log.RequestRejected();
          ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
          context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable; await
          _onRejected(context); }
          問題來了,我這邊如果說還沒到你設(shè)置的大小呢,我這個請求沒有給你服務(wù)器造不成壓力,那么你給我處理一下吧.
          await _serverSemaphore.WaitAsync();
          異步等待進入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進入執(zhí)行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止
          lock (_totalRequestsLock) { if (TotalRequests >= _requestQueueLimit +
          _maxConcurrentRequests) { return false; } TotalRequests++; }
          //異步等待進入信號量,如果沒有線程被授予對信號量的訪問權(quán)限,則進入執(zhí)行保護代碼;否則此線程將在此處等待,直到信號量被釋放為止 await
          _serverSemaphore.WaitAsync(); return true; }
          返回成功后那么中間件這邊再進行處理,_queuePolicy.OnExit();通過該調(diào)用進行調(diào)用_serverSemaphore.Release();
          釋放信號燈,再對總請求數(shù)遞減

          Stack策略

          再來看看另一種方法,棧策略,他是怎么做的呢?一起來看看.再附加上如何使用的代碼.
          public void ConfigureServices(IServiceCollection services) {
          services.AddStackPolicy(options => { //最大并發(fā)請求數(shù) options.MaxConcurrentRequests =
          2; //請求隊列長度限制 options.RequestQueueLimit = 1; }); services.AddControllers(); }
          通過上面的配置,我們便可以對我們的應(yīng)用程序執(zhí)行出相應(yīng)的策略.下面再來看看他是怎么實現(xiàn)的呢
          public static IServiceCollection AddStackPolicy(this IServiceCollection
          services, Action<QueuePolicyOptions> configure) {
          services.Configure(configure); services.AddSingleton<IQueuePolicy,
          StackPolicy>(); return services; }
          可以看到這次是通過StackPolicy類做的策略.來一起來看看主要的方法
          /// <summary> /// 構(gòu)造方法(初始化參數(shù)) /// </summary> /// <param
          name="options"></param> public StackPolicy(IOptions<QueuePolicyOptions>
          options) { //棧分配 _buffer = new List<ResettableBooleanCompletionSource>();
          //隊列大小 _maxQueueCapacity = options.Value.RequestQueueLimit; //最大并發(fā)請求數(shù)
          _maxConcurrentRequests = options.Value.MaxConcurrentRequests; //剩余可用空間
          _freeServerSpots = options.Value.MaxConcurrentRequests; }
          當我們通過中間件請求調(diào)用,_queuePolicy.TryEnterAsync()
          時,首先會判斷我們是否還有訪問請求次數(shù),如果_freeServerSpots>0,那么則直接給我們返回true,讓中間件直接去執(zhí)行下一步,如果當前隊列=我們設(shè)置的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留后面的請求;
          public ValueTask<bool> TryEnterAsync() { lock (_bufferLock) { if
          (_freeServerSpots > 0) { _freeServerSpots--; return _trueTask; } //
          如果隊列滿了,取消先前的請求 if (_queueLength == _maxQueueCapacity) { _hasReachedCapacity =
          true; _buffer[_head].Complete(false); _queueLength--; } var tcs =
          _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
          _cachedResettableTCS = null; if (_hasReachedCapacity || _queueLength <
          _buffer.Count) { _buffer[_head] = tcs; } else { _buffer.Add(tcs); }
          _queueLength++; // increment _head for next time _head++; if (_head ==
          _maxQueueCapacity) { _head = 0; } return tcs.GetValueTask(); } }
          當我們請求后調(diào)用_queuePolicy.OnExit();出棧,再將請求長度遞減
          public void OnExit() { lock (_bufferLock) { if (_queueLength == 0) {
          _freeServerSpots++; if (_freeServerSpots > _maxConcurrentRequests) {
          _freeServerSpots--; throw new InvalidOperationException("OnExit must only be
          called once per successful call to TryEnterAsync"); } return; } // step
          backwards and launch a new task if (_head == 0) { _head = _maxQueueCapacity -
          1; } else { _head--; } //退出,出棧 _buffer[_head].Complete(true); _queueLength--; }
          }
          總結(jié)

          基于棧結(jié)構(gòu)的特點,在實際應(yīng)用中,通常只會對棧執(zhí)行以下兩種操作:

          * 向棧中添加元素,此過程被稱為"進棧"(入?;驂簵#?;
          * 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);
          隊列存儲結(jié)構(gòu)的實現(xiàn)有以下兩種方式:

          * 順序隊列:在順序表的基礎(chǔ)上實現(xiàn)的隊列結(jié)構(gòu);
          * 鏈隊列:在鏈表的基礎(chǔ)上實現(xiàn)的隊列結(jié)構(gòu);

          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                精品国自产拍在线 | 影音先锋资源在线观看 | 久久久久福利视频 | 一道本一区二区视频 | 一级乱伦大片 |