導讀

          *
          前幾天和一個朋友討論了他們公司的系統(tǒng)問題,傳統(tǒng)的單體應用,集群部署,他說近期服務(wù)的并發(fā)量可能會出現(xiàn)瞬時增加的風險,雖然部署了集群,但是通過壓測后發(fā)現(xiàn)請求延遲仍然是很大,想問問我有什么改進的地方。我沉思了一會,現(xiàn)在去改架構(gòu)顯然是不可能的,于是我給出了一個建議,讓他去做個接口限流,這樣能夠保證瞬時并發(fā)量飆高也不會出現(xiàn)請求延遲的問題,用戶的體驗度也會上去。
          * 至于什么是接口限流?怎么實現(xiàn)接口限流?如何實現(xiàn)單機應用的限流?如何實現(xiàn)分布式應用的限流?本篇文章將會詳細闡述。
          限流的常見幾種算法

          * 常見的限流算法有很多,但是最常用的算法無非以下四種。
          固定窗口計數(shù)器



          * 固定算法的概念如下
          * 將時間劃分為多個窗口
          * 在每個窗口內(nèi)每有一次請求就將計數(shù)器加一
          * 如果計數(shù)器超過了限制數(shù)量,則本窗口內(nèi)所有的請求都被丟棄當時間到達下一個窗口時,計數(shù)器重置。
          * 固定窗口計數(shù)器是最為簡單的算法,但這個算法有時會讓通過請求量允許為限制的兩倍。考慮如下情況:限制 1 秒內(nèi)最多通過 5
          個請求,在第一個窗口的最后半秒內(nèi)通過了 5 個請求,第二個窗口的前半秒內(nèi)又通過了 5 個請求。這樣看來就是在 1 秒內(nèi)通過了 10 個請求。


          滑動窗口計數(shù)器



          * 滑動窗口計數(shù)器算法概念如下:
          * 將時間劃分為多個區(qū)間;
          * 在每個區(qū)間內(nèi)每有一次請求就將計數(shù)器加一維持一個時間窗口,占據(jù)多個區(qū)間;
          * 每經(jīng)過一個區(qū)間的時間,則拋棄最老的一個區(qū)間,并納入最新的一個區(qū)間;
          * 如果當前窗口內(nèi)區(qū)間的請求計數(shù)總和超過了限制數(shù)量,則本窗口內(nèi)所有的請求都被丟棄。
          * 滑動窗口計數(shù)器是通過將窗口再細分,并且按照時間 " 滑動
          ",這種算法避免了固定窗口計數(shù)器帶來的雙倍突發(fā)請求,但時間區(qū)間的精度越高,算法所需的空間容量就越大。
          漏桶算法



          * 漏桶算法概念如下:
          * 將每個請求視作 " 水滴 " 放入 " 漏桶 " 進行存儲;
          * “漏桶 " 以固定速率向外 " 漏 " 出請求來執(zhí)行如果 " 漏桶 " 空了則停止 " 漏水”;
          * 如果 " 漏桶 " 滿了則多余的 " 水滴 " 會被直接丟棄。
          * 漏桶算法多使用隊列實現(xiàn),服務(wù)的請求會存到隊列中,服務(wù)的提供方則按照固定的速率從隊列中取出請求并執(zhí)行,過多的請求則放在隊列中排隊或直接拒絕。
          * 漏桶算法的缺陷也很明顯,當短時間內(nèi)有大量的突發(fā)請求時,即便此時服務(wù)器沒有任何負載,每個請求也都得在隊列中等待一段時間才能被響應。
          令牌桶算法



          * 令牌桶算法概念如下:
          * 令牌以固定速率生成。
          * 生成的令牌放入令牌桶中存放,如果令牌桶滿了則多余的令牌會直接丟棄,當請求到達時,會嘗試從令牌桶中取令牌,取到了令牌的請求可以執(zhí)行。
          * 如果桶空了,那么嘗試取令牌的請求會被直接丟棄。
          * 令牌桶算法既能夠?qū)⑺械恼埱笃骄植嫉綍r間區(qū)間內(nèi),又能接受服務(wù)器能夠承受范圍內(nèi)的突發(fā)請求,因此是目前使用較為廣泛的一種限流算法。
          單體應用實現(xiàn)

          *
          在傳統(tǒng)的單體應用中限流只需要考慮到多線程即可,使用Google開源工具類guava即可。其中有一個RateLimiter專門實現(xiàn)了單體應用的限流,使用的是令牌桶算法。
          * 單體應用的限流不是本文的重點,官網(wǎng)上現(xiàn)成的API,讀者自己去看看即可,這里不再詳細解釋。
          分布式限流

          * 分布式限流和熔斷現(xiàn)在有很多的現(xiàn)成的工具,比如Hystrix,Sentinel 等,但是還是有些企業(yè)不引用外來類庫,因此就需要自己實現(xiàn)。
          * Redis作為單線程多路復用的特性,很顯然能夠勝任這項任務(wù)。
          Redis如何實現(xiàn)

          * 使用令牌桶的算法實現(xiàn),根據(jù)前面的介紹,我們了解到令牌桶算法的基礎(chǔ)需要兩個個變量,分別是桶容量,產(chǎn)生令牌的速率。
          * 這里我們實現(xiàn)的就是每秒產(chǎn)生的速率加上一個桶容量。但是如何實現(xiàn)呢?這里有幾個問題。
          *
          需要保存什么數(shù)據(jù)在redis中?

          * 當前桶的容量,最新的請求時間
          * 以什么數(shù)據(jù)結(jié)構(gòu)存儲?
          *
          因為是針對接口限流,每個接口的業(yè)務(wù)邏輯不同,對并發(fā)的處理也是不同,因此要細化到每個接口的限流,此時我們選用HashMap的結(jié)構(gòu),hashKey是接口的唯一id,可以是請求的uri,里面的分別存儲當前桶的容量和最新的請求時間。
          * 如何計算需要放令牌?
          * 根據(jù)redis保存的上次的請求時間和當前時間比較,如果相差大于的產(chǎn)生令牌的時間(陳某實現(xiàn)的是1秒)則再次產(chǎn)生令牌,此時的桶容量為當前令牌+產(chǎn)生的令牌
          * 如何保證redis的原子性?
          * 保證redis的原子性,使用lua腳本即可解決。
          *
          有了上述的幾個問題,便能很容易的實現(xiàn)。

          開擼

          1、lua腳本如下:
          local ratelimit_info =
          redis.pcall('HMGET',KEYS[1],'last_time','current_token') local last_time =
          ratelimit_info[1] local current_token = tonumber(ratelimit_info[2]) local
          max_token = tonumber(ARGV[1]) local token_rate = tonumber(ARGV[2]) local
          current_time = tonumber(ARGV[3]) if current_token == nil then current_token =
          max_token last_time = current_time else local past_time =
          current_time-last_time if past_time>1000 then current_token =
          current_token+token_rate last_time = current_time end ## 防止溢出 if
          current_token>max_token then current_token = max_token last_time = current_time
          end end local result = 0 if(current_token>0) then result = 1 current_token =
          current_token-1 last_time = current_time end
          redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)
          return result
          * 調(diào)用lua腳本出四個參數(shù),分別是接口方法唯一id,桶容量,每秒產(chǎn)生令牌的數(shù)量,當前請求的時間戳。
          2、 SpringBoot代碼實現(xiàn)

          * 采用Spring-data-redis實現(xiàn)lua腳本的執(zhí)行。
          * Redis序列化配置: /** * 重新注入模板 */ @Bean(value = "redisTemplate") @Primary public
          RedisTemplate redisTemplate(RedisConnectionFactory redisConnectionFactory){
          RedisTemplate<String, Object> template = new RedisTemplate<>();
          template.setConnectionFactory(redisConnectionFactory); ObjectMapper
          objectMapper = new ObjectMapper();
          objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
          objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
          //設(shè)置序列化方式,key設(shè)置string 方式,value設(shè)置成json StringRedisSerializer
          stringRedisSerializer = new StringRedisSerializer();
          Jackson2JsonRedisSerializer jsonRedisSerializer = new
          Jackson2JsonRedisSerializer(Object.class);
          jsonRedisSerializer.setObjectMapper(objectMapper);
          template.setEnableDefaultSerializer(false);
          template.setKeySerializer(stringRedisSerializer);
          template.setHashKeySerializer(stringRedisSerializer);
          template.setValueSerializer(jsonRedisSerializer);
          template.setHashValueSerializer(jsonRedisSerializer); return template; }
          * 限流工具類 /** * @Description 限流工具類 * @Author CJB * @Date 2020/3/19 17:21 */
          public class RedisLimiterUtils { private static StringRedisTemplate
          stringRedisTemplate=ApplicationContextUtils.applicationContext.getBean(StringRedisTemplate.class);
          /** * lua腳本,限流 */ private final static String TEXT="local ratelimit_info =
          redis.pcall('HMGET',KEYS[1],'last_time','current_token')\n" + "local last_time
          = ratelimit_info[1]\n" + "local current_token = tonumber(ratelimit_info[2])\n"
          + "local max_token = tonumber(ARGV[1])\n" + "local token_rate =
          tonumber(ARGV[2])\n" + "local current_time = tonumber(ARGV[3])\n" + "if
          current_token == nil then\n" + " current_token = max_token\n" + " last_time =
          current_time\n" + "else\n" + " local past_time = current_time-last_time\n" + "
          \n" + " if past_time>1000 then\n" + "\t current_token =
          current_token+token_rate\n" + "\t last_time = current_time\n" + " end\n" + "\n"
          + " if current_token>max_token then\n" + " current_token = max_token\n" +
          "\tlast_time = current_time\n" + " end\n" + "end\n" + "\n" + "local result =
          0\n" + "if(current_token>0) then\n" + " result = 1\n" + " current_token =
          current_token-1\n" + " last_time = current_time\n" + "end\n" +
          "redis.call('HMSET',KEYS[1],'last_time',last_time,'current_token',current_token)\n"
          + "return result"; /** * 獲取令牌 * @param key 請求id * @param max 最大能同時承受多少的并發(fā)(桶容量)
          * @param rate 每秒生成多少的令牌 * @return 獲取令牌返回true,沒有獲取返回false */ public static
          boolean tryAcquire(String key, int max,int rate) { List<String> keyList = new
          ArrayList<>(1); keyList.add(key); DefaultRedisScript<Long> script = new
          DefaultRedisScript<>(); script.setResultType(Long.class);
          script.setScriptText(TEXT); return
          Long.valueOf(1).equals(stringRedisTemplate.execute(script,keyList,Integer.toString(max),
          Integer.toString(rate), Long.toString(System.currentTimeMillis()))); } }
          * 采用攔截器+注解的方式實現(xiàn),注解如下: /** * @Description
          限流的注解,標注在類上或者方法上。在方法上的注解會覆蓋類上的注解,同@Transactional * @Author CJB * @Date
          2020/3/20 13:36 */ @Inherited @Target({ElementType.TYPE, ElementType.METHOD})
          @Retention(RetentionPolicy.RUNTIME) public @interface RateLimit { /** *
          令牌桶的容量,默認100 * @return */ int capacity() default 100; /** * 每秒鐘默認產(chǎn)生令牌數(shù)量,默認10個 *
          @return */ int rate() default 10; }
          * 攔截器如下: /** * @Description 限流的攔器 * @Author CJB * @Date 2020/3/19 14:34 */
          @Component public class RateLimiterIntercept implements HandlerInterceptor {
          @Override public boolean preHandle(HttpServletRequest request,
          HttpServletResponse response, Object handler) throws Exception { if (handler
          instanceof HandlerMethod){ HandlerMethod handlerMethod=(HandlerMethod)handler;
          Method method = handlerMethod.getMethod(); /** * 首先獲取方法上的注解 */ RateLimit
          rateLimit = AnnotationUtils.findAnnotation(method, RateLimit.class);
          //方法上沒有標注該注解,嘗試獲取類上的注解 if (Objects.isNull(rateLimit)){ //獲取類上的注解 rateLimit =
          AnnotationUtils.findAnnotation(handlerMethod.getBean().getClass(),
          RateLimit.class); } //沒有標注注解,放行 if (Objects.isNull(rateLimit)) return true;
          //嘗試獲取令牌,如果沒有令牌了 if
          (!RedisLimiterUtils.tryAcquire(request.getRequestURI(),rateLimit.capacity(),rateLimit.rate())){
          //拋出請求超時的異常 throw new TimeOutException(); } } return true; } }
          *
          SpringBoot配置攔截器的代碼就不貼了,以上就是完整的代碼,至此分布式限流就完成了。

          *
          如果覺得作者寫的好,有所收獲的話,點個關(guān)注推薦一下喲?。。?br>

          友情鏈接
          ioDraw流程圖
          API參考文檔
          OK工具箱
          云服務(wù)器優(yōu)惠
          阿里云優(yōu)惠券
          騰訊云優(yōu)惠券
          京東云優(yōu)惠券
          站點信息
          問題反饋
          郵箱:[email protected]
          QQ群:637538335
          關(guān)注微信

                男人和女人搞鸡 | 午夜操屄视频 | 一级A片处破女免费 | 人人操人人玩 | 热色视频 |