APISIX的流量控制插件limit-conn用于限制用户对单个服务的并发请求数,即当用户(依据请求中的一个或者多个变量)对路由的并发请求数达到了一定限制,开启限流策略,返回自定义的状态码和响应信息。
源码实现
初始化
配置初始化包括最大并发请求数,超过并发数后被延迟处理的额外并发请求数,默认处理延迟时间,以及限流请求的应答返回码和应答信息;
local plugin_name = "limit-conn"
local schema = {
type = "object",
properties = {
conn = {type = "integer", exclusiveMinimum = 0},
burst = {type = "integer", minimum = 0},
default_conn_delay = {type = "number", exclusiveMinimum = 0},
only_use_default_delay = {type = "boolean", default = false},
key = {type = "string"},
key_type = {type = "string",
enum = {"var", "var_combination"},
default = "var",
},
rejected_code = {
type = "integer", minimum = 200, maximum = 599, default = 503
},
rejected_msg = {
type = "string", minLength = 1
},
allow_degradation = {type = "boolean", default = false}
},
required = {"conn", "burst", "default_conn_delay", "key"}
}
local _M = {
version = 0.1,
priority = 1003,
name = plugin_name,
schema = schema,
}
插件初始化信息包括插件版本,插件优先级,插件名称以及配置信息。
处理逻辑--plugin.access
先获取缓存数据,从上下文信息中根据请求参数构造出用户key,然后将这些数据结合并发数限流配置判断是否开启限流,开启了限流后请求是否在延迟处理范围内:
function _M.increase(conf, ctx)
core.log.info("ver: ", ctx.conf_version)
local lim, err = lrucache(conf, nil, create_limit_obj, conf)
if not lim then
core.log.error("failed to instantiate a resty.limit.conn object: ", err)
if conf.allow_degradation then
return
end
return 500
end
local conf_key = conf.key
local key
if conf.key_type == "var_combination" then
local err, n_resolved
key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var)
if err then
core.log.error("could not resolve vars in ", conf_key, " error: ", err)
end
if n_resolved == 0 then
key = nil
end
else
key = ctx.var[conf_key]
end
if key == nil then
core.log.info("The value of the configured key is empty, use client IP instead")
-- When the value of key is empty, use client IP instead
key = ctx.var["remote_addr"]
end
key = key .. ctx.conf_type .. ctx.conf_version
core.log.info("limit key: ", key)
local delay, err = lim:incoming(key, true)
if not delay then
if err == "rejected" then
if conf.rejected_msg then
return conf.rejected_code, { error_msg = conf.rejected_msg }
end
return conf.rejected_code or 503
end
core.log.error("failed to limit conn: ", err)
if conf.allow_degradation then
return
end
return 500
end
if lim:is_committed() then
if not ctx.limit_conn then
ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
end
core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
end
if delay >= 0.001 then
sleep(delay)
end
end
其中根据并发请求数判断限流开关的主体代码用的是openresty中的limit模块代码lim:incoming(key, true)
核心代码如下:
function _M.incoming(self, key, commit)
local dict = self.dict
local max = self.max
self.committed = false
local conn, err
if commit then
conn, err = dict:incr(key, 1, 0)
if not conn then
return nil, err
end
if conn > max + self.burst then
conn, err = dict:incr(key, -1)
if not conn then
return nil, err
end
return nil, "rejected"
end
self.committed = true
else
conn = (dict:get(key) or 0) + 1
if conn > max + self.burst then
return nil, "rejected"
end
end
if conn > max then
-- make the excessive connections wait
return self.unit_delay * floor((conn - 1) / max), conn
end
-- we return a 0 delay by default
return 0, conn
end
日志处理--并发数修改
function _M.decrease(conf, ctx)
local limit_conn = ctx.limit_conn
if not limit_conn then
return
end
for i = 1, #limit_conn, 4 do
local lim = limit_conn[i]
local key = limit_conn[i + 1]
local delay = limit_conn[i + 2]
local use_delay = limit_conn[i + 3]
local latency
if not use_delay then
if ctx.proxy_passed then
latency = ctx.var.upstream_response_time
else
latency = ctx.var.request_time - delay
end
end
core.log.debug("request latency is ", latency) -- for test
local conn, err = lim:leaving(key, latency)
if not conn then
core.log.error("failed to record the connection leaving request: ",
err)
break
end
end
core.tablepool.release("plugin#limit-conn", limit_conn)
ctx.limit_conn = nil
return
end
其中lim:leaving 用的是openresty中的limit模块代码:
function _M.leaving(self, key, req_latency)
assert(key)
local dict = self.dict
local conn, err = dict:incr(key, -1)
if not conn then
return nil, err
end
if req_latency then
local unit_delay = self.unit_delay
self.unit_delay = (req_latency + unit_delay) / 2
end
return conn
end
插件应用
通过dashboard可以直接开启插件,并配置相关数据