背景

          我司在很久之前,一位很久之前的同事寫過一個文檔轉(zhuǎn)圖片的服務(wù),具體業(yè)務(wù)如下:

          * 用戶在客戶端上傳文檔,可以是ppt,word,pdf
          等格式,用戶上傳完成可以在客戶端預(yù)覽上傳的文檔,預(yù)覽的時候采用的是圖片形式(不要和我說用別的方式預(yù)覽,現(xiàn)在已經(jīng)來不及了)
          * 當(dāng)用戶把文檔上傳到云端之后(阿里云),把文檔相關(guān)的信息記錄在數(shù)據(jù)庫,然后等待轉(zhuǎn)碼完成
          * 服務(wù)器有一個轉(zhuǎn)碼服務(wù)(其實就是一個windows
          service)不停的在輪訓(xùn)待轉(zhuǎn)碼的數(shù)據(jù),如果有待轉(zhuǎn)碼的數(shù)據(jù),則從數(shù)據(jù)庫取出來,然后根據(jù)文檔的網(wǎng)絡(luò)地址下載到本地進(jìn)行轉(zhuǎn)碼(轉(zhuǎn)成多張圖片)
          * 當(dāng)文檔轉(zhuǎn)碼完畢,把轉(zhuǎn)碼出來的圖片上傳到云端,并把云端圖片的信息記錄到數(shù)據(jù)庫
          * 客戶端有預(yù)覽需求的時候,根據(jù)數(shù)據(jù)庫來判斷有沒有轉(zhuǎn)碼成功,如果成功,則獲取數(shù)據(jù)來顯示。
          文檔預(yù)覽的整體過程如以上所說,老的轉(zhuǎn)碼服務(wù)現(xiàn)在什么問題呢?

          * 由于一個文檔同時只能被一個線程進(jìn)行轉(zhuǎn)碼操作,所以老的服務(wù)采用了把待轉(zhuǎn)碼數(shù)據(jù)劃分管道的思想,一共有六個管道,映射到數(shù)據(jù)庫大體就是 Id=》管道ID
          這個樣子。
          * 一個控制臺程序,根據(jù)配置文件信息,讀取某一個管道待轉(zhuǎn)碼的文檔,然后單線程進(jìn)行轉(zhuǎn)碼操作
          * 一共有六個管道,所以服務(wù)器上起了六個cmd的黑窗口……
          * 有的時候個別文檔由于格式問題或者其他問題 轉(zhuǎn)碼過程中會卡住,具體的表現(xiàn)為:停止了轉(zhuǎn)碼操作。
          * 如果程序卡住了,需要運維人員重新啟動轉(zhuǎn)碼cmd窗口(這種維護比較蛋疼)

          后來機緣巧合,這個程序的維護落到的菜菜頭上,維護了一周左右,大約重啟了10多次,終于忍受不了了,重新搞一個吧。仔細(xì)分析過后,刨除實際文檔轉(zhuǎn)碼的核心操作之外,整個轉(zhuǎn)碼流程其實還有很多注意點

          * 需要保證轉(zhuǎn)碼服務(wù)不被卡住,如果和以前一樣就沒有必要重新設(shè)計了
          * 盡量避免開多個進(jìn)程的方式,其實在這個業(yè)務(wù)場景下,多個進(jìn)程和多個線程作用是一致的。
          * 每個文檔只能被轉(zhuǎn)碼一次,如果一個文檔被轉(zhuǎn)碼多次,不僅浪費了服務(wù)器資源,而且還有可能會有數(shù)據(jù)不一致的情況發(fā)生
          * 轉(zhuǎn)碼失敗的文檔需要有一定次數(shù)的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文檔再次被操作的機會
          * 因為程序不停的把文檔轉(zhuǎn)碼成本地圖片,所以需要保證這些文件在轉(zhuǎn)碼完成在服務(wù)器上刪除,不然的話,時間長了會生成很多無用的文件

          說了這么多,其實需要注意的點還是很多的。以整個的轉(zhuǎn)碼流程來說,本質(zhì)上是一個任務(wù)池的生產(chǎn)和消費問題,任務(wù)池中的任務(wù)就是待轉(zhuǎn)碼的文檔,生產(chǎn)者不停的把待轉(zhuǎn)碼文檔丟進(jìn)任務(wù)池,消費者不停的把任務(wù)池中文檔轉(zhuǎn)碼完成。

          線程池


          這很顯然和線程池很類似,菜菜之前就寫過一個線程池的文章,有興趣的同學(xué)可以去翻翻歷史。今天我們就以這個線程池來解決這個轉(zhuǎn)碼問題。線程池的本質(zhì)是初始化一定數(shù)目的線程,不停的執(zhí)行任務(wù)。
          ?//線程池定義?
          ????public?class?LXThreadPool:IDisposable
          ????{
          ????????bool?PoolEnable?=?true;?//線程池是否可用?
          ????????List<Thread>?ThreadContainer?=?null;?//線程的容器
          ????????ConcurrentQueue<ActionData>?JobContainer?=?null;?//任務(wù)的容器
          ????????int?_maxJobNumber;?//線程池最大job容量

          ????????ConcurrentDictionary<string,?DateTime>?JobIdList?=?new
          ?ConcurrentDictionary<string,?DateTime>();?//job的副本,用于排除某個job?是否在運行中


          ????????public?LXThreadPool(int?threadNumber,int?maxJobNumber=1000)
          ????????{
          ????????????if(threadNumber<=0?||?maxJobNumber?<=?0)
          ????????????{
          ????????????????throw?new?Exception("線程池初始化失敗");
          ????????????}
          ????????????_maxJobNumber?=?maxJobNumber;
          ????????????ThreadContainer?=?new?List<Thread>(threadNumber);
          ????????????JobContainer?=?new?ConcurrentQueue<ActionData>();
          ????????????for?(int?i?=?0;?i?<?threadNumber;?i++)
          ????????????{
          ????????????????var?t?=?new?Thread(RunJob);
          ????????????????t.Name?=?$"轉(zhuǎn)碼線程{i}";
          ????????????????ThreadContainer.Add(t);
          ????????????????t.Start();
          ????????????}
          ????????????//清除超時任務(wù)的線程
          ????????????var?tTimeOutJob?=?new?Thread(CheckTimeOutJob);
          ????????????tTimeOutJob.Name?=?$"清理超時任務(wù)線程";
          ????????????tTimeOutJob.Start();
          ????????}

          ????????//往線程池添加一個線程,返回線程池的新線程數(shù)
          ????????public?int?AddThread(int?number=1)
          ????????{
          ????????????if(!PoolEnable?||?ThreadContainer==null
          ?||?!ThreadContainer.Any()?||?JobContainer==null||?!JobContainer.Any())
          ????????????{
          ????????????????return?0;
          ????????????}
          ????????????while?(number?<=?0)
          ????????????{
          ????????????????var?t?=?new?Thread(RunJob);
          ????????????????ThreadContainer.Add(t);
          ????????????????t.Start();
          ????????????????number?-=?number;
          ????????????}
          ????????????return?ThreadContainer?.Count????0;
          ????????}

          ????????//向線程池添加一個任務(wù),返回0:添加任務(wù)失敗???1:成功
          ????????public?int?AddTask(Action<object>?job,?object?obj,string
          ?actionId,?Action<Exception>?errorCallBack?=?null)
          ????????{
          ????????????if?(JobContainer?!=?null)
          ????????????{
          ????????????????if(JobContainer.Count>=?_maxJobNumber)
          ????????????????{
          ????????????????????return?0;
          ????????????????}
          ????????????????//首先排除10分鐘還沒轉(zhuǎn)完的
          ????????????????var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(
          10)?<?DateTime.Now);
          ????????????????if(timeoOutJobList!=null&&?timeoOutJobList.Any())
          ????????????????{
          ????????????????????foreach?(var?timeoutJob?in?timeoOutJobList)
          ????????????????????{
          ????????????????????????JobIdList.TryRemove(timeoutJob.Key,out?DateTime?v);
          ????????????????????}
          ????????????????}

          ????????????????if?(!JobIdList.Any(s?=>?s.Key?==?actionId))
          ????????????????{
          ????????????????????if(JobIdList.TryAdd(actionId,?DateTime.Now))
          ????????????????????{
          ????????????????????????JobContainer.Enqueue(new
          ?ActionData?{?Job?=?job,?Data?=?obj,?ActionId?=?actionId,?ErrorCallBack?=?errorCallBack?});
          ????????????????????????return?1;
          ????????????????????}
          ????????????????????else
          ????????????????????{
          ????????????????????????return?101;
          ????????????????????}
          ????????????????}
          ????????????????else
          ????????????????{
          ????????????????????return?100;
          ????????????????}????????????
          ????????????}
          ????????????return?0;
          ????????}??

          ????????private?void?RunJob()
          ????????{
          ????????????while?(JobContainer?!=?null??&&?PoolEnable)
          ????????????{

          ????????????????//任務(wù)列表取任務(wù)
          ????????????????ActionData?job?=?null;
          ????????????????JobContainer?.TryDequeue(out?job);
          ????????????????if?(job?==?null)
          ????????????????{
          ????????????????????//如果沒有任務(wù)則休眠
          ????????????????????Thread.Sleep(20);
          ????????????????????continue;
          ????????????????}
          ????????????????try
          ????????????????{
          ????????????????????//執(zhí)行任務(wù)
          ????????????????????job.Job.Invoke(job.Data);
          ????????????????}
          ????????????????catch?(Exception?error)
          ????????????????{
          ????????????????????//異?;卣{(diào)
          ????????????????????if?(job?!=?null&&?job.ErrorCallBack!=null)
          ????????????????????{
          ????????????????????????job?.ErrorCallBack(error);
          ????????????????????}

          ????????????????}
          ????????????????finally
          ????????????????{
          ????????????????????if?(!JobIdList.TryRemove(job.ActionId,out?DateTime?v))
          ????????????????????{

          ????????????????????}
          ????????????????}
          ????????????}
          ????????}

          ????????//終止線程池
          ????????public?void?Dispose()
          ????????{
          ????????????PoolEnable?=?false;
          ????????????JobContainer?=?null;
          ????????????if?(ThreadContainer?!=?null)
          ????????????{
          ????????????????foreach?(var?t?in?ThreadContainer)
          ????????????????{
          ????????????????????//強制線程退出并不好,會有異常
          ????????????????????t.Join();
          ????????????????}
          ????????????????ThreadContainer?=?null;
          ????????????}
          ????????}

          ????????//清理超時的任務(wù)
          ????????private?void?CheckTimeOutJob()
          ????????{
          ????????????//首先排除10分鐘還沒轉(zhuǎn)完的
          ????????????var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10
          )?<?DateTime.Now);
          ????????????if?(timeoOutJobList?!=?null?&&?timeoOutJobList.Any())
          ????????????{
          ????????????????foreach?(var?timeoutJob?in?timeoOutJobList)
          ????????????????{
          ????????????????????JobIdList.TryRemove(timeoutJob.Key,?out?DateTime?v);
          ????????????????}
          ????????????}
          ????????????System.Threading.Thread.Sleep(60000);
          ????????}
          ????}
          ????public?class?ActionData
          ????{
          ????????//任務(wù)的id,用于排重
          ????????public?string?ActionId?{?get;?set;?}
          ????????//執(zhí)行任務(wù)的參數(shù)
          ????????public?object?Data?{?get;?set;?}
          ????????//執(zhí)行的任務(wù)
          ????????public?Action<object>?Job?{?get;?set;?}
          ????????//發(fā)生異常時候的回調(diào)方法
          ????????public?Action<Exception>?ErrorCallBack?{?get;?set;?}
          ????}


          以上就是一個線程池的具體實現(xiàn),和具體的業(yè)務(wù)無關(guān),完全可以用于任何適用于線程池的場景,其中有一個注意點,我新加了任務(wù)的標(biāo)示,主要用于排除重復(fù)的任務(wù)被投放多次(只排除正在運行中的任務(wù))。當(dāng)然代碼不是最優(yōu)的,有需要的同學(xué)可以自己去優(yōu)化

          使用線程池


          接下來,我們利用以上的線程池來完成我們的文檔轉(zhuǎn)碼任務(wù),首先我們啟動的時候初始化一個線程池,并啟動一個獨立線程來不停的往線程池來輸送任務(wù),順便起了一個監(jiān)控線程去監(jiān)視發(fā)送任務(wù)的線程
          ???????string?lastResId?=?null;
          ????????string?lastErrorResId?=?null;

          ????????Dictionary<string,?int>?ResErrNumber?=?new?Dictionary<string,?int>();?
          //轉(zhuǎn)碼失敗的資源重試次數(shù)
          ????????int?MaxErrNumber?=?5;//最多轉(zhuǎn)碼錯誤的資源10次
          ????????Thread?tPutJoj?=?null;
          ????????LXThreadPool?pool?=?new?LXThreadPool(4,100);
          ????????public?void?OnStart()
          ????????{
          ????????????//初始化一個線程發(fā)送轉(zhuǎn)碼任務(wù)
          ????????????tPutJoj?=?new?Thread(PutJob);
          ????????????tPutJoj.IsBackground?=?true;
          ????????????tPutJoj.Start();

          ????????????//初始化?監(jiān)控線程
          ????????????var?tMonitor?=?new?Thread(MonitorPutJob);
          ????????????tMonitor.IsBackground?=?true;
          ????????????tMonitor.Start();
          ????????}
          ???????//監(jiān)視發(fā)放job的線程
          ????????private?void?MonitorPutJob()
          ????????{
          ????????????while?(true)
          ????????????{
          ????????????????if(tPutJoj?==?null||?!tPutJoj.IsAlive)
          ????????????????{
          ????????????????????Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線程停止==========");
          ????????????????????tPutJoj?=?new?Thread(PutJob);
          ????????????????????tPutJoj.Start();
          ????????????????????Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線程重新初始化并啟動==========");
          ????????????????}
          ????????????????System.Threading.Thread.Sleep(5000);
          ????????????}

          ????????}

          ????????private?void?PutJob()
          ????????{???????????
          ????????????while?(true)
          ????????????{
          ????????????????try
          ????????????????{
          ????????????????????//先搜索等待轉(zhuǎn)碼的
          ????????????????????var?fileList?=?DocResourceRegisterProxy.GetFileList(new?int
          []?{?(int)FileToImgStateEnum.Wait?},?30,?lastResId);
          ????????????????????Log.Error($"拉取待轉(zhuǎn)碼記錄===總數(shù):lastResId:{lastResId},結(jié)果:
          {fileList?.Count()????0}");
          ????????????????????if?(fileList?==?null?||?!fileList.Any())
          ????????????????????{
          ????????????????????????lastResId?=?null;
          ????????????????????????Log.Error($"待轉(zhuǎn)碼數(shù)量為0,開始拉取轉(zhuǎn)碼失敗記錄,重新轉(zhuǎn)碼==========");
          ????????????????????????//如果無待轉(zhuǎn),則把出錯的?嘗試
          ????????????????????????fileList?=?DocResourceRegisterProxy.GetFileList(new?int
          []?{?(int)FileToImgStateEnum.Error,?(int)FileToImgStateEnum.TimeOut,?(int
          )FileToImgStateEnum.Fail?},?1,?lastErrorResId);
          ????????????????????????if?(fileList?==?null?||?!fileList.Any())
          ????????????????????????{
          ????????????????????????????lastErrorResId?=?null;
          ????????????????????????}
          ????????????????????????else
          ????????????????????????{
          ????????????????????????????
          //?Log.Error($"開始轉(zhuǎn)碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");
          ????????????????????????????List<DocResourceRegister>?errFilter?=?new
          ?List<DocResourceRegister>();
          ????????????????????????????foreach?(var?errRes?in?fileList)
          ????????????????????????????{
          ????????????????????????????????if?(ResErrNumber.TryGetValue(errRes.res_id,?out
          ?int?number))
          ????????????????????????????????{
          ????????????????????????????????????if?(number?>?MaxErrNumber)
          ????????????????????????????????????{
          ????????????????????????????????????????Log.Error($"資源:{errRes.res_id}?轉(zhuǎn)了
          {MaxErrNumber}次不成功,放棄===========");
          ????????????????????????????????????????continue;
          ????????????????????????????????????}
          ????????????????????????????????????else
          ????????????????????????????????????{
          ????????????????????????????????????????errFilter.Add(errRes);
          ????????????????????????????????????????ResErrNumber[errRes.res_id]?=?number?+?
          1;
          ????????????????????????????????????}
          ????????????????????????????????}
          ????????????????????????????????else
          ????????????????????????????????{
          ????????????????????????????????????ResErrNumber.Add(errRes.res_id,?1);
          ????????????????????????????????????errFilter.Add(errRes);
          ????????????????????????????????}
          ????????????????????????????}
          ????????????????????????????fileList?=?errFilter;
          ????????????????????????????if?(fileList.Any())
          ????????????????????????????{

          ????????????????????????????????lastErrorResId?=?fileList.Select(s?=>?s.res_id).Max();
          ????????????????????????????}
          ????????????????????????}
          ????????????????????}
          ????????????????????else
          ????????????????????{
          ????????????????????????lastResId?=?fileList.Select(s?=>?s.res_id).Max();
          ????????????????????}

          ????????????????????if?(fileList?!=?null?&&?fileList.Any())
          ????????????????????{
          ????????????????????????foreach?(var?file?in?fileList)
          ????????????????????????{
          ????????????????????????????//如果?任務(wù)投放線程池失敗,則等待一面繼續(xù)投放
          ????????????????????????????int?poolRet?=?0;
          ????????????????????????????while?(poolRet?<=?0)
          ????????????????????????????{
          ????????????????????????????????poolRet?=?pool.AddTask(s?=>?{
          ????????????????????????????????????AliFileService.ConvertToImg(file.res_id?+?
          $".{file.res_ext}",?FileToImgFac.Instance(file.res_ext));
          ????????????????????????????????},?file,?file.res_id);
          ????????????????????????????????if?(poolRet?<=?0?||?poolRet?>?1)
          ????????????????????????????????{
          ????????????????????????????????????Log.Error($"發(fā)放轉(zhuǎn)碼任務(wù)失敗==========線程池返回結(jié)果:
          {poolRet}");
          ????????????????????????????????????System.Threading.Thread.Sleep(1000);
          ????????????????????????????????}
          ????????????????????????????}
          ????????????????????????}
          ????????????????????}
          ????????????????????//每一秒去數(shù)據(jù)庫取一次數(shù)據(jù)
          ????????????????????System.Threading.Thread.Sleep(3000);
          ????????????????}
          ????????????????catch
          ????????????????{
          ????????????????????continue;
          ????????????????}

          ????????????}
          ????????}


          以上就是發(fā)放任務(wù),線程池執(zhí)行任務(wù)的所有代碼,由于具體的轉(zhuǎn)碼代碼涉及到隱私,這里不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優(yōu)的方式,但是我覺得線程池這樣的思想可能會對部分人有幫助,其中任務(wù)超時的核心代碼如下(采用了polly插件):
          ?var?policy=?Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut),?onTimeout:?(
          context,?timespan,?task)?=>
          ????????????????{

          ????????????????????ret.State=Enum.FileToImgStateEnum.TimeOut;???????????????????
          ????????????????});
          ????????????????policy.Execute(s=>{
          ????????????????????.....
          ????????????????});

          把你的更優(yōu)方案寫在留言區(qū)吧,2020年大家越來越好

          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                和尚与寡妇在线三级 | 免费一级网站 | 爱豆传媒成人A片免费看 | 国产一区二区在线免费观看 | 中国性老太HD大全69 |