引言
熟悉TPL Dataflow <https://www.cnblogs.com/JulianHuang/p/11177766.html>
博文的朋友可能記得這是個(gè)單體程序,使用TPL Dataflow 處理工作流任務(wù), 在使用Docker部署的過程中, 有一個(gè)問題一直無法回避:
? ? ? ?在單體程序部署的瞬間(服務(wù)不可用)會(huì)有少量流量無法處理;更糟糕的情況下,迭代部署的這個(gè)版本有問題,上線后無法運(yùn)作, 更多的流量沒有得到處理。
? ? ? 背負(fù)神圣使命(巨大壓力)的程序猿心生一計(jì), 為何不將單體程序改成分布式:服務(wù)A只接受數(shù)據(jù),服務(wù)B只處理數(shù)據(jù)。
?
知識(shí)儲(chǔ)備
? ? 消息隊(duì)列和訂閱發(fā)布作為老生常談的兩個(gè)知識(shí)點(diǎn)被反復(fù)提及,按照J(rèn)MS的規(guī)范, 官方稱為點(diǎn)對(duì)點(diǎn)(point to point, queue) 和
訂閱發(fā)布(publish/subscribe,topic ),
點(diǎn)對(duì)點(diǎn)
消息生產(chǎn)者生產(chǎn)消息發(fā)送到queue中,然后消費(fèi)者從queue中取出并且消費(fèi)消息。
注意:
消息被消費(fèi)以后,queue中不再有存儲(chǔ),所以消費(fèi)者不可能消費(fèi)到已經(jīng)被消費(fèi)的消息。
Queue支持存在多個(gè)消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。
當(dāng)沒有消費(fèi)者可用時(shí),這個(gè)消息會(huì)被保存直到有 一個(gè)可用的消費(fèi)者。
發(fā)布/訂閱
消息生產(chǎn)者(發(fā)布)將消息發(fā)布到topic中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到topic的消息會(huì)被所有訂閱者消費(fèi)。
注意:
發(fā)布者將消息發(fā)布到通道中,而不用知道訂閱者是誰(不關(guān)注是否存在訂閱者);訂閱者可收聽自己感興趣的多個(gè)通道, 也不需要知道發(fā)布者是誰(不關(guān)注是哪個(gè)發(fā)布者)。
故如果沒有消費(fèi)者,發(fā)布的消息將得不到處理;
頭腦風(fēng)暴?
本次采用的消息隊(duì)列模型:
* ? ?解耦業(yè)務(wù):? 新建Receiver程序作為生產(chǎn)者,專注于接收并發(fā)送到隊(duì)列;原有的webapp作為消費(fèi)者專注數(shù)據(jù)處理。
* ? ?起到削峰填谷的作用, 若建立多個(gè)消費(fèi)者webapp容器,還能形成負(fù)載均衡的效果。?
Redis 原生支持發(fā)布/訂閱 <https://redis.io/commands/publish>模型,內(nèi)置的List數(shù)據(jù)結(jié)構(gòu)
<https://redis.io/commands/lpush>亦能形成輕量級(jí)MQ的效果。
? ? 需要關(guān)注Redis 兩個(gè)命令( 左進(jìn)右出,右進(jìn)左出同理):
? ? LPUSH? &? RPOP/BRPOP <https://redis.io/commands/brpop>
Brpop 中的B 表示 “Block”,
是一個(gè)rpop命令的阻塞版本:若指定List沒有新元素,在給定時(shí)間內(nèi),該命令會(huì)阻塞當(dāng)前redis客戶端連接,直到超時(shí)返回nil
編程實(shí)踐
本次使用 ASPNetCore 完成RedisMQ的實(shí)踐,引入Redis國(guó)產(chǎn)第三方開源庫(kù)CSRedisCore.
不使用著名的StackExchange.Redis 組件庫(kù)的原因:
*
之前一直使用StackExchange.Redis, 參考了很多資料,做了很多優(yōu)化,并未完全解決RedisTimeoutException問題?
*
StackExchange.Redis基于其多路復(fù)用的連接機(jī)制,不支持阻塞式命令
<https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers>,
故采用了 CSRedisCore,該庫(kù)強(qiáng)調(diào)了API 與Redis官方命令一致,很容易上手
生產(chǎn)者Receiver
?生產(chǎn)者使用LPush?命令向Redis List數(shù)據(jù)結(jié)構(gòu)寫入消息。
------------------截取自Startup.cs-------------------------
public void ConfigureServices(IServiceCollection services)
{
// Redis客戶端要定義成單例, 不然在大流量并發(fā)收數(shù)的時(shí)候, 會(huì)造成redis client來不及釋放。另一方面也確認(rèn)api控制器不是單例模式,
var csredis = new
CSRedisClient(Configuration.GetConnectionString("redis")+",name=receiver");
RedisHelper.Initialization(csredis);
services.AddSingleton(csredis);
services.AddMvc();
}
------------------截取自數(shù)據(jù)接收Controller------------------- [Route("batch")]
[HttpPost]public async Task BatchPutEqidAndProfileIds([FromBody]List<EqidPair>
eqidPairs) { if (!ModelState.IsValid) throw new ArgumentException("Http
Body Payload Error."); var redisKey = $"{DateTime.Now.ToString("yyyyMMdd")}";
eqidPairs= await EqidExtractor.EqidExtractAsync(eqidPairs); if (eqidPairs !=
null && eqidPairs.Any()) RedisHelper.LPush(redisKey, eqidPairs.ToArray());
await Task.CompletedTask; }
?消費(fèi)者webapp
? ? ?根據(jù)以上RedisMQ思路,事件消費(fèi)方式是拉取pull,故需要輪詢Redis? List數(shù)據(jù)結(jié)構(gòu),這里使用ASPNetCore內(nèi)置的
BackgroundService后臺(tái)服務(wù)類實(shí)現(xiàn)后臺(tái)輪詢消費(fèi)任務(wù)。
public class BackgroundJob : BackgroundService { private readonly
IEqidPairHandler _eqidPairHandler;private readonly CSRedisClient[]
_cSRedisClients;private readonly IConfiguration _conf; private readonly ILogger
_logger;public BackgroundJob(IEqidPairHandler eqidPairHandler, CSRedisClient[]
csRedisClients,IConfiguration conf,ILoggerFactory loggerFactory) {
_eqidPairHandler= eqidPairHandler; _cSRedisClients = csRedisClients; _conf =
conf; _logger= loggerFactory.CreateLogger(nameof(BackgroundJob)); } protected
override async Task ExecuteAsync(CancellationToken stoppingToken) {
_logger.LogInformation("Service starting"); if (_cSRedisClients[0] == null) {
_cSRedisClients[0] = new CSRedisClient(_conf.GetConnectionString("redis") + "
,defaultDatabase=" + 0); } RedisHelper.Initialization(_cSRedisClients[0]); while
(!stoppingToken.IsCancellationRequested) { var key = $"
eqidpair:{DateTime.Now.ToString("yyyyMMdd")}"; var eqidpair = RedisHelper.BRPop(
5, key); if (eqidpair != null) await
_eqidPairHandler.AcceptEqidParamAsync(JsonConvert.DeserializeObject<EqidPair>
(eqidpair));// 強(qiáng)烈建議無論如何休眠一段時(shí)間,防止突發(fā)大流量導(dǎo)致webApp進(jìn)程CPU滿載,自行根據(jù)場(chǎng)景設(shè)置合理休眠時(shí)間 await
Task.Delay(10, stoppingToken); } _logger.LogInformation("Service stopping"); } }
?
最后依照引言中的部署原理圖,將Nginx,Receiver, WebApp使用docker-compose工具容器化
根據(jù)docker-compsoe up <https://docs.docker.com/compose/reference/up/>
命令的用法,若容器正在運(yùn)行且對(duì)應(yīng)的Service Configuration或Image并未改變,該容器不會(huì)被ReCreate;
docker-compose? up? 命令默認(rèn)只會(huì)停止Service或Image變更的容器并重建。
If there are existing containers for a service, and the service’s
configuration or image was changed after the container’s creation,?
docker-compose up?picks up the changes by stopping and recreating the
containers (preserving mounted volumes). To prevent Compose from picking up
changes, use the?--no-recreate?flag.
做一次上線測(cè)試驗(yàn)證,修改docker-compose.yml文件Web?app的容器服務(wù),docker-compose up;
僅數(shù)據(jù)處理程序WebApp容器被重建:
Nice,分布式改造上線,效果很明顯,現(xiàn)在可以放心安全的迭代Web App數(shù)據(jù)處理程序。
作者:JulianHuang <https://www.cnblogs.com/JulianHuang/>
碼甲拙見,如有問題請(qǐng)下方留言大膽斧正;碼字+Visio制圖,均為原創(chuàng),看官請(qǐng)不吝好評(píng)+關(guān)注,? ~。。~
本文歡迎轉(zhuǎn)載,請(qǐng)轉(zhuǎn)載頁(yè)面明顯位置注明原作者及原文鏈接。
?
熱門工具 換一換