1. 令牌桶按用户维度限流
前文golang/x/time/rate演示了基于整体请求速率的令牌桶限流;
那基于用户id、ip、apikey请求速率的限流(更贴近生产的需求), 阁下又该如何应对?
那这个问题就从全局速率变成了按照用户维度(group by userid)来做限流,那么
- 早先的全局的rateLimiter就要变成人手一个令牌桶,也就是userid:rateLimiter的键值对集合,select count( * ) from table ---> select userid, count(*) from table group by userid
- 使用缓存组件来存储维度键值对: 缓存的剔除机制来清理不再访问的键值对 (30min过期,10min周期清理内存)。
1var userLimiters = cache.New(time.Minute*30, 10) // 10 items per minute 2func limiterForUser(userID string) *rate.Limiter { 3 if v, found := userLimiters.Get(userID); found { 4 return v.(*rate.Limiter) 5 } 6 7 l := rate.NewLimiter(rate.Every(time.Minute/60), 10) 8 userLimiters.Set(userID, l, cache.DefaultExpiration) 9 return l 10} 11 12// 更细化的限流: 针对同一用户的请求次数限速, 增加了细粒度的用户维度,需要维护 用户与对应限速器的映射关系 13func userRatelimitMiddleware(c *gin.Context) { 14 userID := c.GetString("userID") // 从每个请求context的key中取得信息, 这个key对于req context是排他性的 15 if userID == "" { 16 c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Unauthorized"}) 17 return 18 } 19 if userID == "" { 20 userID = c.GetString("x-api-key") 21 } 22 23 if userID == "" { 24 userID = c.ClientIP() 25 } 26 limiter := limiterForUser(userID) // 通过userid维度找到对应的限速器 27 if !limiter.Allow() { 28 c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{"error": "Too many requests"}) 29 return 30 } 31 c.Next() 32} 33
2. redis 作为限流器外置存储
这个思路也是极其常见的行为: redis可以成为用户令牌桶的全局中心存储: 当多个负载层需要读写用户限流器时,与redis交互。
本次通过golang的实战,深入理解基于redis的令牌桶限流器的算法实现。
① 请求到达负载层,被负载层识别为userid=junio
② 负载层请求redis获取该用户的token bucket的当前状态:hget userbucket:junio tokens last_time
③ 基于当前时间now和last_time,计算流逝的时间,再根据rate计算这一阶段下发了多少tokens:delta=(now-last_time) * r/1000,加上redis原始记录的token,就是本次请求时bucket中能用的tokens, 注意:令牌数量最多不能超过cap
④ 如果tokens>=1, 表示桶中有令牌,可放行请求,tokens数量减1
⑤ 最后将本次处理完后的 tokens和last_time=now写入原用户令牌桶hset userbucket:junio tokens 20 last_time 990
使用redis 中的hashmap存储用户的tokenbucket状态,应用存在
读取redis- 计算- 回写redis过程,使用redis lua的脚本执行三个动作,以保证线程安全。
为什么lua脚本能保证线程安全呢?
主要得益于 Redis 的单线程架构和原子性执行机制: 加载并执行lua脚本时所有的redis操作作为一个整体完成; 整个脚本执行期间没有其他命令可以插入。
1// 读取- 计算 - 重新赋值都在一个 lua 脚本里面 2var redisScript = ` 3 local key = KEYS[1] 4 local capacity = tonumber(ARGV[1]) 5 local rate = tonumber(ARGV[2]) 6 local now = tonumber(ARGV[3]) 7 local tokens = tonumber(redis.call('hget', key, 'tokens') or '-1') 8 local last_time = tonumber(redis.call('hget', key, 'last_time') or '-1') 9 10 if tokens == -1 or last_time == -1 then 11 tokens = capacity 12 last_time = now 13 else 14 local elapsed = now - last_time 15 if elapsed < 0 16 then elapsed = 0 17 end 18 local delta = elapsed * rate / 1000 19 tokens = tokens + delta 20 if tokens > capacity then 21 tokens = capacity 22 end 23 last_time = now 24 end 25 local allow = 0 26 if tokens >= 1 then 27 allow = 1 28 tokens= tokens - 1 29 else 30 allow = 0 31 end 32 33 redis.call('hset', key, 'tokens', tokens) 34 redis.call('hset', key, 'last_time', last_time) 35 redis.call('PEXPIRE', key, math.max(1000, 2 * math.ceil((capacity / rate) * 5000))) 36 return allow 37` 38
注意
- 上面还使用的redis expire机制: redis expire不是滑动过期,但是每次被请求触发执行的时候就重新设置TTL, 表现为“滑动过期”。
- 除了hset/hget ,还有hmget可用,另外这些操作还有配套的TTL指令,eg:
hset key EXAT 1740470400 FIELDS 2 field1 "Hello" field2 "World"。
golang应用层的写法如下:
1func (r *RedisLimiter) Allow(c *gin.Context, userid string) bool { 2 key := r.keyprefix + userid // 定位这个用户的token bucket 3 now := time.Now().UnixMilli() 4 // Check if the key exists in Redis 5 rCmd := r.redis.Eval(redisScript, []string{key}, r.cap, r.rate, now) 6 res, err := rCmd.Result() 7 if err != nil { 8 log.Printf("get from redis failure. ", err) 9 return false 10 } 11 if allow, ok := res.(int64); ok { // 注意:lua返回的0,1 值对应golang的int64 12 log.Printf("%v %v \n", allow, res) 13 return allow == 1 14 } else { 15 log.Printf("get from redis failure. ", err) 16 return false 17 } 18} 19
3. 总结展望
至此限流第二弹结束了,本文紧接掘金爆文🎨 新来的外包,限流算法用的这么6,进一步讲述了
① 实现根据特定业务维度的限流: 从全局限流器转换成针对业务维度的人手一个限流器;
② redis作为限流计数器的外置存储,令牌桶算法在redis上的算法实现:核心是使用hashmap存储当前请求用户的令牌桶状态(current_tokens, last_time), 落地时注意使用lua脚本避免竞态条件。
后面35+外包er针对限流设计还会再更新几个彩蛋, 期待一键三连,交个朋友, 35+报团不迷路。
《🎨 新来的外包,在大群分享了它的限流算法的实现》 是转载文章,点击查看原文。