背景
我司在很久之前,一位很久之前的同事寫過一個文檔轉(zhuǎn)圖片的服務(wù),具體業(yè)務(wù)如下:
* 用戶在客戶端上傳文檔,可以是ppt,word,pdf
等格式,用戶上傳完成可以在客戶端預(yù)覽上傳的文檔,預(yù)覽的時候采用的是圖片形式(不要和我說用別的方式預(yù)覽,現(xiàn)在已經(jīng)來不及了)
* 當(dāng)用戶把文檔上傳到云端之后(阿里云),把文檔相關(guān)的信息記錄在數(shù)據(jù)庫,然后等待轉(zhuǎn)碼完成
* 服務(wù)器有一個轉(zhuǎn)碼服務(wù)(其實就是一個windows
service)不停的在輪訓(xùn)待轉(zhuǎn)碼的數(shù)據(jù),如果有待轉(zhuǎn)碼的數(shù)據(jù),則從數(shù)據(jù)庫取出來,然后根據(jù)文檔的網(wǎng)絡(luò)地址下載到本地進(jìn)行轉(zhuǎn)碼(轉(zhuǎn)成多張圖片)
* 當(dāng)文檔轉(zhuǎn)碼完畢,把轉(zhuǎn)碼出來的圖片上傳到云端,并把云端圖片的信息記錄到數(shù)據(jù)庫
* 客戶端有預(yù)覽需求的時候,根據(jù)數(shù)據(jù)庫來判斷有沒有轉(zhuǎn)碼成功,如果成功,則獲取數(shù)據(jù)來顯示。
文檔預(yù)覽的整體過程如以上所說,老的轉(zhuǎn)碼服務(wù)現(xiàn)在什么問題呢?
* 由于一個文檔同時只能被一個線程進(jìn)行轉(zhuǎn)碼操作,所以老的服務(wù)采用了把待轉(zhuǎn)碼數(shù)據(jù)劃分管道的思想,一共有六個管道,映射到數(shù)據(jù)庫大體就是 Id=》管道ID
這個樣子。
* 一個控制臺程序,根據(jù)配置文件信息,讀取某一個管道待轉(zhuǎn)碼的文檔,然后單線程進(jìn)行轉(zhuǎn)碼操作
* 一共有六個管道,所以服務(wù)器上起了六個cmd的黑窗口……
* 有的時候個別文檔由于格式問題或者其他問題 轉(zhuǎn)碼過程中會卡住,具體的表現(xiàn)為:停止了轉(zhuǎn)碼操作。
* 如果程序卡住了,需要運維人員重新啟動轉(zhuǎn)碼cmd窗口(這種維護比較蛋疼)
后來機緣巧合,這個程序的維護落到的菜菜頭上,維護了一周左右,大約重啟了10多次,終于忍受不了了,重新搞一個吧。仔細(xì)分析過后,刨除實際文檔轉(zhuǎn)碼的核心操作之外,整個轉(zhuǎn)碼流程其實還有很多注意點
* 需要保證轉(zhuǎn)碼服務(wù)不被卡住,如果和以前一樣就沒有必要重新設(shè)計了
* 盡量避免開多個進(jìn)程的方式,其實在這個業(yè)務(wù)場景下,多個進(jìn)程和多個線程作用是一致的。
* 每個文檔只能被轉(zhuǎn)碼一次,如果一個文檔被轉(zhuǎn)碼多次,不僅浪費了服務(wù)器資源,而且還有可能會有數(shù)據(jù)不一致的情況發(fā)生
* 轉(zhuǎn)碼失敗的文檔需要有一定次數(shù)的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文檔再次被操作的機會
* 因為程序不停的把文檔轉(zhuǎn)碼成本地圖片,所以需要保證這些文件在轉(zhuǎn)碼完成在服務(wù)器上刪除,不然的話,時間長了會生成很多無用的文件
說了這么多,其實需要注意的點還是很多的。以整個的轉(zhuǎn)碼流程來說,本質(zhì)上是一個任務(wù)池的生產(chǎn)和消費問題,任務(wù)池中的任務(wù)就是待轉(zhuǎn)碼的文檔,生產(chǎn)者不停的把待轉(zhuǎn)碼文檔丟進(jìn)任務(wù)池,消費者不停的把任務(wù)池中文檔轉(zhuǎn)碼完成。
線程池
這很顯然和線程池很類似,菜菜之前就寫過一個線程池的文章,有興趣的同學(xué)可以去翻翻歷史。今天我們就以這個線程池來解決這個轉(zhuǎn)碼問題。線程池的本質(zhì)是初始化一定數(shù)目的線程,不停的執(zhí)行任務(wù)。
?//線程池定義?
????public?class?LXThreadPool:IDisposable
????{
????????bool?PoolEnable?=?true;?//線程池是否可用?
????????List<Thread>?ThreadContainer?=?null;?//線程的容器
????????ConcurrentQueue<ActionData>?JobContainer?=?null;?//任務(wù)的容器
????????int?_maxJobNumber;?//線程池最大job容量
????????ConcurrentDictionary<string,?DateTime>?JobIdList?=?new
?ConcurrentDictionary<string,?DateTime>();?//job的副本,用于排除某個job?是否在運行中
????????public?LXThreadPool(int?threadNumber,int?maxJobNumber=1000)
????????{
????????????if(threadNumber<=0?||?maxJobNumber?<=?0)
????????????{
????????????????throw?new?Exception("線程池初始化失敗");
????????????}
????????????_maxJobNumber?=?maxJobNumber;
????????????ThreadContainer?=?new?List<Thread>(threadNumber);
????????????JobContainer?=?new?ConcurrentQueue<ActionData>();
????????????for?(int?i?=?0;?i?<?threadNumber;?i++)
????????????{
????????????????var?t?=?new?Thread(RunJob);
????????????????t.Name?=?$"轉(zhuǎn)碼線程{i}";
????????????????ThreadContainer.Add(t);
????????????????t.Start();
????????????}
????????????//清除超時任務(wù)的線程
????????????var?tTimeOutJob?=?new?Thread(CheckTimeOutJob);
????????????tTimeOutJob.Name?=?$"清理超時任務(wù)線程";
????????????tTimeOutJob.Start();
????????}
????????//往線程池添加一個線程,返回線程池的新線程數(shù)
????????public?int?AddThread(int?number=1)
????????{
????????????if(!PoolEnable?||?ThreadContainer==null
?||?!ThreadContainer.Any()?||?JobContainer==null||?!JobContainer.Any())
????????????{
????????????????return?0;
????????????}
????????????while?(number?<=?0)
????????????{
????????????????var?t?=?new?Thread(RunJob);
????????????????ThreadContainer.Add(t);
????????????????t.Start();
????????????????number?-=?number;
????????????}
????????????return?ThreadContainer?.Count????0;
????????}
????????//向線程池添加一個任務(wù),返回0:添加任務(wù)失敗???1:成功
????????public?int?AddTask(Action<object>?job,?object?obj,string
?actionId,?Action<Exception>?errorCallBack?=?null)
????????{
????????????if?(JobContainer?!=?null)
????????????{
????????????????if(JobContainer.Count>=?_maxJobNumber)
????????????????{
????????????????????return?0;
????????????????}
????????????????//首先排除10分鐘還沒轉(zhuǎn)完的
????????????????var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(
10)?<?DateTime.Now);
????????????????if(timeoOutJobList!=null&&?timeoOutJobList.Any())
????????????????{
????????????????????foreach?(var?timeoutJob?in?timeoOutJobList)
????????????????????{
????????????????????????JobIdList.TryRemove(timeoutJob.Key,out?DateTime?v);
????????????????????}
????????????????}
????????????????if?(!JobIdList.Any(s?=>?s.Key?==?actionId))
????????????????{
????????????????????if(JobIdList.TryAdd(actionId,?DateTime.Now))
????????????????????{
????????????????????????JobContainer.Enqueue(new
?ActionData?{?Job?=?job,?Data?=?obj,?ActionId?=?actionId,?ErrorCallBack?=?errorCallBack?});
????????????????????????return?1;
????????????????????}
????????????????????else
????????????????????{
????????????????????????return?101;
????????????????????}
????????????????}
????????????????else
????????????????{
????????????????????return?100;
????????????????}????????????
????????????}
????????????return?0;
????????}??
????????private?void?RunJob()
????????{
????????????while?(JobContainer?!=?null??&&?PoolEnable)
????????????{
????????????????//任務(wù)列表取任務(wù)
????????????????ActionData?job?=?null;
????????????????JobContainer?.TryDequeue(out?job);
????????????????if?(job?==?null)
????????????????{
????????????????????//如果沒有任務(wù)則休眠
????????????????????Thread.Sleep(20);
????????????????????continue;
????????????????}
????????????????try
????????????????{
????????????????????//執(zhí)行任務(wù)
????????????????????job.Job.Invoke(job.Data);
????????????????}
????????????????catch?(Exception?error)
????????????????{
????????????????????//異?;卣{(diào)
????????????????????if?(job?!=?null&&?job.ErrorCallBack!=null)
????????????????????{
????????????????????????job?.ErrorCallBack(error);
????????????????????}
????????????????}
????????????????finally
????????????????{
????????????????????if?(!JobIdList.TryRemove(job.ActionId,out?DateTime?v))
????????????????????{
????????????????????}
????????????????}
????????????}
????????}
????????//終止線程池
????????public?void?Dispose()
????????{
????????????PoolEnable?=?false;
????????????JobContainer?=?null;
????????????if?(ThreadContainer?!=?null)
????????????{
????????????????foreach?(var?t?in?ThreadContainer)
????????????????{
????????????????????//強制線程退出并不好,會有異常
????????????????????t.Join();
????????????????}
????????????????ThreadContainer?=?null;
????????????}
????????}
????????//清理超時的任務(wù)
????????private?void?CheckTimeOutJob()
????????{
????????????//首先排除10分鐘還沒轉(zhuǎn)完的
????????????var?timeoOutJobList?=?JobIdList.Where(s?=>?s.Value.AddMinutes(10
)?<?DateTime.Now);
????????????if?(timeoOutJobList?!=?null?&&?timeoOutJobList.Any())
????????????{
????????????????foreach?(var?timeoutJob?in?timeoOutJobList)
????????????????{
????????????????????JobIdList.TryRemove(timeoutJob.Key,?out?DateTime?v);
????????????????}
????????????}
????????????System.Threading.Thread.Sleep(60000);
????????}
????}
????public?class?ActionData
????{
????????//任務(wù)的id,用于排重
????????public?string?ActionId?{?get;?set;?}
????????//執(zhí)行任務(wù)的參數(shù)
????????public?object?Data?{?get;?set;?}
????????//執(zhí)行的任務(wù)
????????public?Action<object>?Job?{?get;?set;?}
????????//發(fā)生異常時候的回調(diào)方法
????????public?Action<Exception>?ErrorCallBack?{?get;?set;?}
????}
以上就是一個線程池的具體實現(xiàn),和具體的業(yè)務(wù)無關(guān),完全可以用于任何適用于線程池的場景,其中有一個注意點,我新加了任務(wù)的標(biāo)示,主要用于排除重復(fù)的任務(wù)被投放多次(只排除正在運行中的任務(wù))。當(dāng)然代碼不是最優(yōu)的,有需要的同學(xué)可以自己去優(yōu)化
使用線程池
接下來,我們利用以上的線程池來完成我們的文檔轉(zhuǎn)碼任務(wù),首先我們啟動的時候初始化一個線程池,并啟動一個獨立線程來不停的往線程池來輸送任務(wù),順便起了一個監(jiān)控線程去監(jiān)視發(fā)送任務(wù)的線程
???????string?lastResId?=?null;
????????string?lastErrorResId?=?null;
????????Dictionary<string,?int>?ResErrNumber?=?new?Dictionary<string,?int>();?
//轉(zhuǎn)碼失敗的資源重試次數(shù)
????????int?MaxErrNumber?=?5;//最多轉(zhuǎn)碼錯誤的資源10次
????????Thread?tPutJoj?=?null;
????????LXThreadPool?pool?=?new?LXThreadPool(4,100);
????????public?void?OnStart()
????????{
????????????//初始化一個線程發(fā)送轉(zhuǎn)碼任務(wù)
????????????tPutJoj?=?new?Thread(PutJob);
????????????tPutJoj.IsBackground?=?true;
????????????tPutJoj.Start();
????????????//初始化?監(jiān)控線程
????????????var?tMonitor?=?new?Thread(MonitorPutJob);
????????????tMonitor.IsBackground?=?true;
????????????tMonitor.Start();
????????}
???????//監(jiān)視發(fā)放job的線程
????????private?void?MonitorPutJob()
????????{
????????????while?(true)
????????????{
????????????????if(tPutJoj?==?null||?!tPutJoj.IsAlive)
????????????????{
????????????????????Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線程停止==========");
????????????????????tPutJoj?=?new?Thread(PutJob);
????????????????????tPutJoj.Start();
????????????????????Log.Error($"發(fā)送轉(zhuǎn)碼任務(wù)線程重新初始化并啟動==========");
????????????????}
????????????????System.Threading.Thread.Sleep(5000);
????????????}
????????}
????????private?void?PutJob()
????????{???????????
????????????while?(true)
????????????{
????????????????try
????????????????{
????????????????????//先搜索等待轉(zhuǎn)碼的
????????????????????var?fileList?=?DocResourceRegisterProxy.GetFileList(new?int
[]?{?(int)FileToImgStateEnum.Wait?},?30,?lastResId);
????????????????????Log.Error($"拉取待轉(zhuǎn)碼記錄===總數(shù):lastResId:{lastResId},結(jié)果:
{fileList?.Count()????0}");
????????????????????if?(fileList?==?null?||?!fileList.Any())
????????????????????{
????????????????????????lastResId?=?null;
????????????????????????Log.Error($"待轉(zhuǎn)碼數(shù)量為0,開始拉取轉(zhuǎn)碼失敗記錄,重新轉(zhuǎn)碼==========");
????????????????????????//如果無待轉(zhuǎn),則把出錯的?嘗試
????????????????????????fileList?=?DocResourceRegisterProxy.GetFileList(new?int
[]?{?(int)FileToImgStateEnum.Error,?(int)FileToImgStateEnum.TimeOut,?(int
)FileToImgStateEnum.Fail?},?1,?lastErrorResId);
????????????????????????if?(fileList?==?null?||?!fileList.Any())
????????????????????????{
????????????????????????????lastErrorResId?=?null;
????????????????????????}
????????????????????????else
????????????????????????{
????????????????????????????
//?Log.Error($"開始轉(zhuǎn)碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");
????????????????????????????List<DocResourceRegister>?errFilter?=?new
?List<DocResourceRegister>();
????????????????????????????foreach?(var?errRes?in?fileList)
????????????????????????????{
????????????????????????????????if?(ResErrNumber.TryGetValue(errRes.res_id,?out
?int?number))
????????????????????????????????{
????????????????????????????????????if?(number?>?MaxErrNumber)
????????????????????????????????????{
????????????????????????????????????????Log.Error($"資源:{errRes.res_id}?轉(zhuǎn)了
{MaxErrNumber}次不成功,放棄===========");
????????????????????????????????????????continue;
????????????????????????????????????}
????????????????????????????????????else
????????????????????????????????????{
????????????????????????????????????????errFilter.Add(errRes);
????????????????????????????????????????ResErrNumber[errRes.res_id]?=?number?+?
1;
????????????????????????????????????}
????????????????????????????????}
????????????????????????????????else
????????????????????????????????{
????????????????????????????????????ResErrNumber.Add(errRes.res_id,?1);
????????????????????????????????????errFilter.Add(errRes);
????????????????????????????????}
????????????????????????????}
????????????????????????????fileList?=?errFilter;
????????????????????????????if?(fileList.Any())
????????????????????????????{
????????????????????????????????lastErrorResId?=?fileList.Select(s?=>?s.res_id).Max();
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????else
????????????????????{
????????????????????????lastResId?=?fileList.Select(s?=>?s.res_id).Max();
????????????????????}
????????????????????if?(fileList?!=?null?&&?fileList.Any())
????????????????????{
????????????????????????foreach?(var?file?in?fileList)
????????????????????????{
????????????????????????????//如果?任務(wù)投放線程池失敗,則等待一面繼續(xù)投放
????????????????????????????int?poolRet?=?0;
????????????????????????????while?(poolRet?<=?0)
????????????????????????????{
????????????????????????????????poolRet?=?pool.AddTask(s?=>?{
????????????????????????????????????AliFileService.ConvertToImg(file.res_id?+?
$".{file.res_ext}",?FileToImgFac.Instance(file.res_ext));
????????????????????????????????},?file,?file.res_id);
????????????????????????????????if?(poolRet?<=?0?||?poolRet?>?1)
????????????????????????????????{
????????????????????????????????????Log.Error($"發(fā)放轉(zhuǎn)碼任務(wù)失敗==========線程池返回結(jié)果:
{poolRet}");
????????????????????????????????????System.Threading.Thread.Sleep(1000);
????????????????????????????????}
????????????????????????????}
????????????????????????}
????????????????????}
????????????????????//每一秒去數(shù)據(jù)庫取一次數(shù)據(jù)
????????????????????System.Threading.Thread.Sleep(3000);
????????????????}
????????????????catch
????????????????{
????????????????????continue;
????????????????}
????????????}
????????}
以上就是發(fā)放任務(wù),線程池執(zhí)行任務(wù)的所有代碼,由于具體的轉(zhuǎn)碼代碼涉及到隱私,這里不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優(yōu)的方式,但是我覺得線程池這樣的思想可能會對部分人有幫助,其中任務(wù)超時的核心代碼如下(采用了polly插件):
?var?policy=?Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut),?onTimeout:?(
context,?timespan,?task)?=>
????????????????{
????????????????????ret.State=Enum.FileToImgStateEnum.TimeOut;???????????????????
????????????????});
????????????????policy.Execute(s=>{
????????????????????.....
????????????????});
把你的更優(yōu)方案寫在留言區(qū)吧,2020年大家越來越好
熱門工具 換一換