限流方案

q1871901600 发布于 2024-11-16 28 次阅读


一,Gateway基于redis+lua脚本限流

spring cloud官方提供了RequestRateLimiter过滤器工厂,基于redis+lua脚本方式采用令牌桶算法实现了限流。

https://docs.spring.io/spring-cloud-gateway/docs/current/reference/html/#the-requestratelimiter-gatewayfilter-factory

请求不被允许时返回状态:HTTP 429 - Too Many Requests。

1)添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-pool2</artifactId>
</dependency>

2)修改 application.yml ,添加redis配置和RequestRateLimiter过滤器工厂配置

spring:
  application:
    name: biz-gateway

  data:
    #配置redis地址
    redis:
      host: localhost
      port: 6379
      database: 0
      timeout: 5000
      lettuce:
        pool:
          max-active: 200
          max-wait: 10000
          max-idle: 100
          min-idle: 10
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    gateway:
      #设置路由:路由id、路由到微服务的uri、断言
      routes:
        - id: order_route  #路由ID,全局唯一,建议配置服务名
          # 测试 http://localhost:8888/order/findOrderByUserId/1
          uri: lb://mall-order  #lb 整合负载均衡器ribbon,loadbalancer
          predicates:
            - Path=/order/**   # 断言,路径相匹配的进行路由
          #配置过滤器工厂
          filters:
            - name: RequestRateLimiter   #限流过滤器
              args:
                redis-rate-limiter.replenishRate: 1 #令牌桶每秒填充速率
                redis-rate-limiter.burstCapacity: 2 #令牌桶的总容量
                key-resolver: "#{@keyResolver}" #使用SpEL表达式,从Spring容器中获取Bean对象 

3) 配置keyResolver,可以指定限流策略,比如url限流,参数限流,ip限流等等

@Bean
KeyResolver keyResolver() {
    //url限流
    return exchange -> Mono.just(exchange.getRequest().getURI().getPath());
    //参数限流
    //return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("user"));
}

给出操作Redis的Lua脚本 request_rate_limiter.lua代码,算法实现的核心步骤基本都在里面:

local tokens_key = KEYS[1]         -- 令牌桶保存在redis中的 key
local timestamp_key = KEYS[2]      -- 最近一次拿令牌的时间的 key
--redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
​
local rate = tonumber(ARGV[1])      -- 每秒往桶里放的令牌数,即填充速率
local capacity = tonumber(ARGV[2])  -- 桶的最大容量
local now = tonumber(ARGV[3])       -- 当前时间,单位:秒
local requested = tonumber(ARGV[4]) -- 每个请求需要几个令牌,默认 1 个
​
local fill_time = capacity/rate     -- 从 0 开始,需要几秒可以填满桶
local ttl = math.floor(fill_time*2) -- key 的存活时间,如果超过了这个时间,桶就填充满了,限流数据就没有存在的必要了,过期后再遇到请求可以从头再计算
​
local last_tokens = tonumber(redis.call("get", tokens_key))  -- 目前桶里的令牌数
if last_tokens == nil then                                   -- 如果还没有初始化,则令牌数按最大容量capacity计
  last_tokens = capacity
end
​
local last_refreshed = tonumber(redis.call("get", timestamp_key))  -- 最后一次取令牌的时间
if last_refreshed == nil then                                      -- 如果还没有初始化,计为 0
  last_refreshed = 0
end
​
local delta = math.max(0, now-last_refreshed)                      -- 上次取令牌过了多少秒
local filled_tokens = math.min(capacity, last_tokens+(delta*rate)) -- 本次放入新令牌后的令牌总数, filled_tokens = 填充了令牌后的令牌总数, delta*rate = 上次取令牌后填充的令牌数
local allowed = filled_tokens >= requested                         -- 现在桶里的令牌数够不够本次执行: 填充后的令牌数 >= 本次需要的令牌数
local new_tokens = filled_tokens                                   -- 新的令牌数, 即 填充完令牌后,剩余的令牌数
local allowed_num = 0
if allowed then                                                    -- 如果当前令牌数足够本次执行
  new_tokens = filled_tokens - requested                           -- 新的令牌数 = 填充完令牌后的令牌数 - 本次消耗的令牌数
  allowed_num = 1                                                  -- 允许的请求个数为1,表示允许本次请求
end
​
if ttl > 0 then                                      -- 如果当前key还没有过期,则需要更新缓存,如果已经过了有效期的话就不需要更新数据了,直接让数据过期就可以
  redis.call("setex", tokens_key, ttl, new_tokens)   -- 更新令牌桶里的令牌数
  redis.call("setex", timestamp_key, ttl, now)       -- 更新最后一次执行时间
end
​
-- return { allowed_num, new_tokens, capacity, filled_tokens, requested, new_tokens }
return { allowed_num, new_tokens }               -- 根据返回的allowed_num==1判断是否允许本次请求, new_tokens为当前剩余令牌数

脚本的调用方为 org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter.java 类,核心方法是 RedisRateLimiter#isAllowed(String routeId, String id) ,在这个方法中基于Lua脚本的返回值判断是否允许本次请求。下面是对应的源码:

/**
   * 这个方法用简单的令牌桶(token bucket)算法,依赖于自动执行的Redis脚本,
   * 在获取计数和写入新计数之间不能运行其他操作。
   * 
   * @param routeId 对应配置文件中的 routes.id 值,如 微服务名称 user
   * @param id 对应请求中解析出来的标识, 如 按IP限流,按用户ID限流,按请求实例的host限流
   */
  @Override
  @SuppressWarnings("unchecked")
  public Mono<Response> isAllowed(String routeId, String id) {
    if (!this.initialized.get()) {
      throw new IllegalStateException("RedisRateLimiter is not initialized");
    }
​
    // 获取配置文件中对此 route 的具体配置
    Config routeConfig = loadConfiguration(routeId);
​
    // 您希望允许用户每秒执行多少次请求?
    int replenishRate = routeConfig.getReplenishRate();
    // 令牌桶的最大容量是多少?
    int burstCapacity = routeConfig.getBurstCapacity();
    // 每个请求需要使用几个token?
    int requestedTokens = routeConfig.getRequestedTokens();
​
    try {
      // 返回2个要在Redis中使用的key:对应令牌桶的key 和 对令牌桶的上次操作时间的key
      List<String> keys = getKeys(id);
​
      // 用于LUA脚本的参数,time()返回单位为秒的unixtime
      List<String> scriptArgs = Arrays.asList(replenishRate + "",
          burstCapacity + "", Instant.now().getEpochSecond() + "",
          requestedTokens + "");
      // 返回两个结果值 allowed, tokens_left,具体含义参考上面的LUA脚本源码
      Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
      // 根据LUA脚本的 2 个返回值,进行后续逻辑处理
      return flux.onErrorResume(throwable -> {
        // 如果调用LUA脚本出现了异常,则返回固定的结果值,这个结果值意味着 本次request会被允许
        return Flux.just(Arrays.asList(1L, -1L));
      }).reduce(new ArrayList<Long>(), (longs, l) -> {
        longs.addAll(l);
        return longs;
      }).map(results -> {
        // 处理LUA返回的结果值,第一个参数表示是否允许本次操作,第二个参数表示令牌桶中的剩余令牌数
        boolean allowed = results.get(0) == 1L;
        Long tokensLeft = results.get(1);
        
        // 把这些结果值保存到response中,返回给上游
        Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
        return response;
      });
    }
    catch (Exception e) {
      /*
       * 我们不想强依赖Redis来控制流量,这里主要是出错的时候能发出告警即可(如果出错太多次的话)。脚本出错的概率是 0.01%。 
       */
      log.error("Error determining if user allowed from redis", e);
    }
    // 如果报错了,则会允许本次请求
    return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
  }

4) 测试

url限流:http://localhost:8888/order/findOrderByUserId/1

参数限流:http://localhost:8888/order/findOrderByUserId/1?user=fox

二,Gateway整合sentinel限流

从 1.6.0 版本开始,Sentinel 提供了 Spring Cloud Gateway 的适配模块,可以提供两种资源维度的限流:

  • route 维度:即在 Spring 配置文件中配置的路由条目,资源名为对应的 routeId
  • 自定义 API 维度:用户可以利用 Sentinel 提供的 API 来自定义一些 API 分组

sentinel网关流控:https://sentinelguard.io/zh-cn/docs/api-gateway-flow-control.html

Gateway整合sentinel实现网关限流

1)引入依赖

<!-- gateway接入sentinel  -->
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-alibaba-sentinel-gateway</artifactId>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>

2)添加yml配置,接入sentinel dashboard,通过sentinel控制台配置网关流控规则

server:
  port: 8888
spring:
  application:
    name: gateway-sentinel
  main:
    allow-bean-definition-overriding: true
  #配置nacos注册中心地址
  cloud:
    nacos:
      discovery:
        server-addr: 127.0.0.1:8848

    sentinel:
      transport:
        # 添加sentinel的控制台地址
        dashboard: 127.0.0.1:8080

    gateway:
      #设置路由:路由id、路由到微服务的uri、断言
      routes:
      - id: order_route  #路由ID,全局唯一,建议配合服务名
        uri: lb://mall-order  #lb 整合负载均衡器loadbalancer
        predicates:
        - Path=/order/**

      - id: user_route
        uri: lb://mall-user  #lb 整合负载均衡器loadbalancer
        predicates:
        - Path=/user/**

注意:基于SpringBoot3的 Spring Cloud Gateway和Sentinel还存在兼容性问题,等待Sentinel官方对最新的Gateway适配包更新

一个会写python的Java工程师
最后更新于 2024-11-16