導讀
*
前幾天和一個朋友討論了他們公司的系統(tǒng)問題,傳統(tǒng)的單體應用,集群部署,他說近期服務(wù)的并發(fā)量可能會出現(xiàn)瞬時增加的風險,雖然部署了集群,但是通過壓測后發(fā)現(xiàn)請求延遲仍然是很大,想問問我有什么改進的地方。我沉思了一會,現(xiàn)在去改架構(gòu)顯然是不可能的,于是我給出了一個建議,讓他去做個接口限流,這樣能夠保證瞬時并發(fā)量飆高也不會出現(xiàn)請求延遲的問題,用戶的體驗度也會上去。
* 至于什么是接口限流?怎么實現(xiàn)接口限流?如何實現(xiàn)單機應用的限流?如何實現(xiàn)分布式應用的限流?本篇文章將會詳細闡述。
限流的常見幾種算法
* 常見的限流算法有很多,但是最常用的算法無非以下四種。
固定窗口計數(shù)器
* 固定算法的概念如下
* 將時間劃分為多個窗口
* 在每個窗口內(nèi)每有一次請求就將計數(shù)器加一
* 如果計數(shù)器超過了限制數(shù)量,則本窗口內(nèi)所有的請求都被丟棄當時間到達下一個窗口時,計數(shù)器重置。
* 固定窗口計數(shù)器是最為簡單的算法,但這個算法有時會讓通過請求量允許為限制的兩倍。考慮如下情況:限制 1 秒內(nèi)最多通過 5
個請求,在第一個窗口的最后半秒內(nèi)通過了 5 個請求,第二個窗口的前半秒內(nèi)又通過了 5 個請求。這樣看來就是在 1 秒內(nèi)通過了 10 個請求。
滑動窗口計數(shù)器
* 滑動窗口計數(shù)器算法概念如下:
* 將時間劃分為多個區(qū)間;
* 在每個區(qū)間內(nèi)每有一次請求就將計數(shù)器加一維持一個時間窗口,占據(jù)多個區(qū)間;
* 每經(jīng)過一個區(qū)間的時間,則拋棄最老的一個區(qū)間,并納入最新的一個區(qū)間;
* 如果當前窗口內(nèi)區(qū)間的請求計數(shù)總和超過了限制數(shù)量,則本窗口內(nèi)所有的請求都被丟棄。
* 滑動窗口計數(shù)器是通過將窗口再細分,并且按照時間 " 滑動
",這種算法避免了固定窗口計數(shù)器帶來的雙倍突發(fā)請求,但時間區(qū)間的精度越高,算法所需的空間容量就越大。
漏桶算法
* 漏桶算法概念如下:
* 將每個請求視作 " 水滴 " 放入 " 漏桶 " 進行存儲;
* “漏桶 " 以固定速率向外 " 漏 " 出請求來執(zhí)行如果 " 漏桶 " 空了則停止 " 漏水”;
* 如果 " 漏桶 " 滿了則多余的 " 水滴 " 會被直接丟棄。
* 漏桶算法多使用隊列實現(xiàn),服務(wù)的請求會存到隊列中,服務(wù)的提供方則按照固定的速率從隊列中取出請求并執(zhí)行,過多的請求則放在隊列中排隊或直接拒絕。
* 漏桶算法的缺陷也很明顯,當短時間內(nèi)有大量的突發(fā)請求時,即便此時服務(wù)器沒有任何負載,每個請求也都得在隊列中等待一段時間才能被響應。
令牌桶算法
* 令牌桶算法概念如下:
* 令牌以固定速率生成。
* 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執(zhí)行。
* 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。
* 令牌桶算法既能夠?qū)⑺械恼埱笃骄植嫉綍r間區(qū)間內(nèi),又能接受服務(wù)器能夠承受范圍內(nèi)的突發(fā)請求,因此是目前使用較為廣泛的一種限流算法。
單體應用實現(xiàn)
*
在傳統(tǒng)的單體應用中限流只需要考慮到多線程即可,使用Google開源工具類guava即可。其中有一個RateLimiter專門實現(xiàn)了單體應用的限流,使用的是令牌桶算法。
* 單體應用的限流不是本文的重點,官網(wǎng)上現(xiàn)成的API,讀者自己去看看即可,這里不再詳細解釋。
分布式限流
* 分布式限流和熔斷現(xiàn)在有很多的現(xiàn)成的工具,比如Hystrix,Sentinel 等,但是還是有些企業(yè)不引用外來類庫,因此就需要自己實現(xiàn)。
* Redis作為單線程多路復用的特性,很顯然能夠勝任這項任務(wù)。
Redis如何實現(xiàn)
* 使用令牌桶的算法實現(xiàn),根據(jù)前面的介紹,我們了解到令牌桶算法的基礎(chǔ)需要兩個個變量,分別是桶容量,產(chǎn)生令牌的速率。
* 這里我們實現(xiàn)的就是每秒產(chǎn)生的速率加上一個桶容量。但是如何實現(xiàn)呢?這里有幾個問題。
*
需要保存什么數(shù)據(jù)在redis中?
* 當前桶的容量,最新的請求時間
* 以什么數(shù)據(jù)結(jié)構(gòu)存儲?
*
因為是針對接口限流,每個接口的業(yè)務(wù)邏輯不同,對并發(fā)的處理也是不同,因此要細化到每個接口的限流,此時我們選用HashMap的結(jié)構(gòu),hashKey是接口的唯一id,可以是請求的uri,里面的分別存儲當前桶的容量和最新的請求時間。
* 如何計算需要放令牌?
* 根據(jù)redis保存的上次的請求時間和當前時間比較,如果相差大于的產(chǎn)生令牌的時間(陳某實現(xiàn)的是1秒)則再次產(chǎn)生令牌,此時的桶容量為當前令牌+產(chǎn)生的令牌
* 如何保證redis的原子性?
* 保證redis的原子性,使用lua腳本即可解決。
*
有了上述的幾個問題,便能很容易的實現(xiàn)。
開擼
1、lua腳本如下:
local ratelimit_info =
redis.pcall('HMGET',KEYS[1],'last_time','current_token') local last_time =
ratelimit_info[1] local current_token = tonumber(ratelimit_info[2]) local
max_token = tonumber(ARGV[1]) local token_rate = tonumber(ARGV[2]) local
current_time = tonumber(ARGV[3]) if current_token == nil then current_token =
max_token last_time = current_time else local past_time =
current_time-last_time if past_time>1000 then current_token =
current_token+token_rate last_time = current_time end ## 防止溢出 if
current_token>max_token then current_token = max_token last_time = current_time
end end local result = 0 if(current_token>0) then result = 1 current_token =
current_token-1 last_time = current_time end
redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
return result
* 調(diào)用lua腳本出四個參數(shù),分別是接口方法唯一id,桶容量,每秒產(chǎn)生令牌的數(shù)量,當前請求的時間戳。
2、 SpringBoot代碼實現(xiàn)
* 采用Spring-data-redis實現(xiàn)lua腳本的執(zhí)行。
* Redis序列化配置: /** * 重新注入模板 */ @Bean(value = "redisTemplate") @Primary public
RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory); ObjectMapper
objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
//設(shè)置序列化方式,key設(shè)置string 方式,value設(shè)置成json StringRedisSerializer
stringRedisSerializer = new StringRedisSerializer();
Jackson2JsonRedisSerializer jsonRedisSerializer = new
Jackson2JsonRedisSerializer(Object.class);
jsonRedisSerializer.setObjectMapper(objectMapper);
template.setEnableDefaultSerializer(false);
template.setKeySerializer(stringRedisSerializer);
template.setHashKeySerializer(stringRedisSerializer);
template.setValueSerializer(jsonRedisSerializer);
template.setHashValueSerializer(jsonRedisSerializer); return template; }
* 限流工具類 /** * @Description 限流工具類 * @Author CJB * @Date 2020/3/19 17:21 */
public class RedisLimiterUtils { private static StringRedisTemplate
stringRedisTemplate=ApplicationContextUtils.applicationContext.getBean(StringRedisTemplate.class);
/** * lua腳本,限流 */ private final static String TEXT="local ratelimit_info =
redis.pcall('HMGET',KEYS[1],'last_time','current_token')\n" + "local last_time
= ratelimit_info[1]\n" + "local current_token = tonumber(ratelimit_info[2])\n"
+ "local max_token = tonumber(ARGV[1])\n" + "local token_rate =
tonumber(ARGV[2])\n" + "local current_time = tonumber(ARGV[3])\n" + "if
current_token == nil then\n" + " current_token = max_token\n" + " last_time =
current_time\n" + "else\n" + " local past_time = current_time-last_time\n" + "
\n" + " if past_time>1000 then\n" + "\t current_token =
current_token+token_rate\n" + "\t last_time = current_time\n" + " end\n" + "\n"
+ " if current_token>max_token then\n" + " current_token = max_token\n" +
"\tlast_time = current_time\n" + " end\n" + "end\n" + "\n" + "local result =
0\n" + "if(current_token>0) then\n" + " result = 1\n" + " current_token =
current_token-1\n" + " last_time = current_time\n" + "end\n" +
"redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)\n"
+ "return result"; /** * 獲取令牌 * @param key 請求id * @param max 最大能同時承受多少的并發(fā)(桶容量)
* @param rate 每秒生成多少的令牌 * @return 獲取令牌返回true,沒有獲取返回false */ public static
boolean tryAcquire(String key, int max,int rate) { List<String> keyList = new
ArrayList<>(1); keyList.add(key); DefaultRedisScript<Long> script = new
DefaultRedisScript<>(); script.setResultType(Long.class);
script.setScriptText(TEXT); return
Long.valueOf(1).equals(stringRedisTemplate.execute(script,keyList,Integer.toString(max),
Integer.toString(rate), Long.toString(System.currentTimeMillis()))); } }
* 采用攔截器+注解的方式實現(xiàn),注解如下: /** * @Description
限流的注解,標注在類上或者方法上。在方法上的注解會覆蓋類上的注解,同@Transactional * @Author CJB * @Date
2020/3/20 13:36 */ @Inherited @Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { /** *
令牌桶的容量,默認100 * @return */ int capacity() default 100; /** * 每秒鐘默認產(chǎn)生令牌數(shù)量,默認10個 *
@return */ int rate() default 10; }
* 攔截器如下: /** * @Description 限流的攔器 * @Author CJB * @Date 2020/3/19 14:34 */
@Component public class RateLimiterIntercept implements HandlerInterceptor {
@Override public boolean preHandle(HttpServletRequest request,
HttpServletResponse response, Object handler) throws Exception { if (handler
instanceof HandlerMethod){ HandlerMethod handlerMethod=(HandlerMethod)handler;
Method method = handlerMethod.getMethod(); /** * 首先獲取方法上的注解 */ RateLimit
rateLimit = AnnotationUtils.findAnnotation(method, RateLimit.class);
//方法上沒有標注該注解,嘗試獲取類上的注解 if (Objects.isNull(rateLimit)){ //獲取類上的注解 rateLimit =
AnnotationUtils.findAnnotation(handlerMethod.getBean().getClass(),
RateLimit.class); } //沒有標注注解,放行 if (Objects.isNull(rateLimit)) return true;
//嘗試獲取令牌,如果沒有令牌了 if
(!RedisLimiterUtils.tryAcquire(request.getRequestURI(),rateLimit.capacity(),rateLimit.rate())){
//拋出請求超時的異常 throw new TimeOutException(); } } return true; } }
*
SpringBoot配置攔截器的代碼就不貼了,以上就是完整的代碼,至此分布式限流就完成了。
*
如果覺得作者寫的好,有所收獲的話,點個關(guān)注推薦一下喲?。。?br>
熱門工具 換一換