在并行編程中,經(jīng)常會遇到多線程間操作共享集合的問題,很多時候大家都很難逃避這個問題做到一種無鎖編程狀態(tài),你也知道一旦給共享集合套上lock之后,并發(fā)和伸縮能力往往會造成很大影響,這篇就來談?wù)勅绾伪M可能的減少lock鎖次數(shù)甚至沒有。
一:緣由
1. 業(yè)務(wù)背景
昨天在review代碼的時候,看到以前自己寫的這么一段代碼,精簡后如下:
private static List<long> ExecuteFilterList(int shopID,
List<MemoryCacheTrade> trades, List<FilterConditon> filterItemList,
MatrixSearchContext searchContext) { var customerIDList = new List<long>(); var
index = 0; Parallel.ForEach(filterItemList, new ParallelOptions() {
MaxDegreeOfParallelism = 4 }, (filterItem) => { var context = new
FilterItemContext() { StartTime = searchContext.StartTime, EndTime =
searchContext.EndTime, ShopID = shopID, Field = filterItem.Field, FilterType =
filterItem.FilterType, ItemList = filterItem.FilterValue, SearchList =
trades.ToList() }; var smallCustomerIDList = context.Execute(); lock
(filterItemList) { if (index == 0) {
customerIDList.AddRange(smallCustomerIDList); index++; } else { customerIDList
= customerIDList.Intersect(smallCustomerIDList).ToList(); } } }); return
customerIDList; }
這段代碼實現(xiàn)的功能是這樣的,filterItemList承載著所有原子化的篩選條件,然后用多線程的形式并發(fā)執(zhí)行里面的item,最后將每個item獲取的客戶人數(shù)集合在高層進(jìn)行整體求交,畫個簡圖就是下面這樣。
2. 問題分析
其實這代碼存在著一個很大的問題,在Parallel中直接使用lock鎖的話,filterItemList有多少個,我的lock就會鎖多少次,這對并發(fā)和伸縮性是有一定影響的,現(xiàn)在就來想想怎么優(yōu)化吧!
3. 測試案例
為了方便演示,我模擬了一個小案例,方便大家看到實時結(jié)果,修改后的代碼如下:
public static void Main(string[] args) { var filterItemList = new
List<string>() { "conditon1", "conditon2", "conditon3", "conditon4",
"conditon5", "conditon6" }; ParallelTest1(filterItemList); } public static void
ParallelTest1(List<string> filterItemList) { var totalCustomerIDList = new
List<int>(); bool isfirst = true; Parallel.ForEach(filterItemList, new
ParallelOptions() { MaxDegreeOfParallelism = 2 }, (query) => { var
smallCustomerIDList = GetCustomerIDList(query); lock (filterItemList) { if
(isfirst) { totalCustomerIDList.AddRange(smallCustomerIDList); isfirst = false;
} else { totalCustomerIDList =
totalCustomerIDList.Intersect(smallCustomerIDList).ToList(); }
Console.WriteLine($"{DateTime.Now} 被鎖了"); } });
Console.WriteLine($"最后交集客戶ID:{string.Join(",", totalCustomerIDList)}"); }
public static List<int> GetCustomerIDList(string query) { var dict = new
Dictionary<string, List<int>>() { ["conditon1"] = new List<int>() { 1, 2, 4, 7
}, ["conditon2"] = new List<int>() { 1, 4, 6, 7 }, ["conditon3"] = new
List<int>() { 1, 4, 5, 7 }, ["conditon4"] = new List<int>() { 1, 2, 3, 7 },
["conditon5"] = new List<int>() { 1, 2, 4, 5, 7 }, ["conditon6"] = new
List<int>() { 1, 3, 4, 7, 9 }, }; return dict[query]; } ------ output ------
2020/04/21 15:53:34 被鎖了 2020/04/21 15:53:34 被鎖了 2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了 2020/04/21 15:53:34 被鎖了 2020/04/21 15:53:34 被鎖了
最后交集客戶ID:1,7
二:第一次優(yōu)化
從結(jié)果中可以看到,filterItemList有6個,鎖次數(shù)也是6次,那如何降低呢?
其實實現(xiàn)Parallel代碼的FCL大神也考慮到了這個問題,從底層給了一個很好的重載,如下所示:
public static ParallelLoopResult ForEach<TSource,
TLocal>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions,
Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal>
body, Action<TLocal> localFinally);
這個重載很特別,多了兩個參數(shù)localInit和localFinally,過會說一下什么意思,先看修改后的代碼體會一下
public static void ParallelTest2(List<string> filterItemList) { var
totalCustomerIDList = new List<int>(); var isfirst = true;
Parallel.ForEach<string, List<int>>(filterItemList, new ParallelOptions() {
MaxDegreeOfParallelism = 2 }, () => { return null; }, (query, loop, index,
smalllist) => { var smallCustomerIDList = GetCustomerIDList(query); if
(smalllist == null) return smallCustomerIDList; return
smalllist.Intersect(smallCustomerIDList).ToList(); }, (finalllist) => { lock
(filterItemList) { if (isfirst) { totalCustomerIDList.AddRange(finalllist);
isfirst = false; } else { totalCustomerIDList =
totalCustomerIDList.Intersect(finalllist).ToList(); }
Console.WriteLine($"{DateTime.Now} 被鎖了"); } });
Console.WriteLine($"最后交集客戶ID:{string.Join(",", totalCustomerIDList)}"); }
------- output ------ 2020/04/21 16:11:46 被鎖了 2020/04/21 16:11:46 被鎖了
最后交集客戶ID:1,7 Press any key to continue . . .
很好,這次優(yōu)化將lock次數(shù)從6次降到了2次,這里我用了 new ParallelOptions() { MaxDegreeOfParallelism =
2 }
設(shè)置了并發(fā)度為最多2個CPU核,程序跑起來后會開兩個線程,將一個大集合劃分為2個小集合,相當(dāng)于1個集合3個條件,第一個線程在執(zhí)行3個條件的起始處會執(zhí)行你的localInit函數(shù),在3個條件迭代完之后再執(zhí)行你的localFinally,第二個線程也是按照同樣方式執(zhí)行自己的3個條件,說的有點晦澀,畫一張圖說明吧。
三: 第二次優(yōu)化
如果你了解Task<T>這種帶有返回值的Task,這就好辦了,多少個filterItemList就可以開多少個Task,反正Task底層是使用線程池承載的,所以不用怕,這樣就完美的實現(xiàn)無鎖編程。
public static void ParallelTest3(List<string> filterItemList) { var
totalCustomerIDList = new List<int>(); var tasks = new
Task<List<int>>[filterItemList.Count]; for (int i = 0; i <
filterItemList.Count; i++) { tasks[i] = Task.Factory.StartNew((query) => {
return GetCustomerIDList(query.ToString()); }, filterItemList[i]); }
Task.WaitAll(tasks); for (int i = 0; i < tasks.Length; i++) { var
smallCustomerIDList = tasks[i].Result; if (i == 0) {
totalCustomerIDList.AddRange(smallCustomerIDList); } else { totalCustomerIDList
= totalCustomerIDList.Intersect(smallCustomerIDList).ToList(); } }
Console.WriteLine($"最后交集客戶ID:{string.Join(",", totalCustomerIDList)}"); }
------ output ------- 最后交集客戶ID:1,7 Press any key to continue . . .
四:總結(jié)
我們將原來的6個lock優(yōu)化到了無鎖編程,但并不說明無鎖編程就一定比帶有l(wèi)ock的效率高,大家要結(jié)合自己的使用場景合理的使用和混合搭配。
好了,本篇就說到這里,希望對您有幫助。
如您有更多問題與我互動,掃描下方進(jìn)來吧~
熱門工具 換一換