searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

APISIX源码剖析--插件limit-conn

2023-06-14 08:20:58
81
0

APISIX的流量控制插件limit-conn用于限制用户对单个服务的并发请求数,即当用户(依据请求中的一个或者多个变量)对路由的并发请求数达到了一定限制,开启限流策略,返回自定义的状态码和响应信息。

源码实现

初始化

配置初始化包括最大并发请求数,超过并发数后被延迟处理的额外并发请求数,默认处理延迟时间,以及限流请求的应答返回码和应答信息;

local plugin_name = "limit-conn"
local schema = {
    type = "object",
    properties = {
        conn = {type = "integer", exclusiveMinimum = 0},
        burst = {type = "integer",  minimum = 0},
        default_conn_delay = {type = "number", exclusiveMinimum = 0},
        only_use_default_delay = {type = "boolean", default = false},
        key = {type = "string"},
        key_type = {type = "string",
            enum = {"var", "var_combination"},
            default = "var",
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 503
        },
        rejected_msg = {
            type = "string", minLength = 1
        },
        allow_degradation = {type = "boolean", default = false}
    },
    required = {"conn", "burst", "default_conn_delay", "key"}
}

local _M = {
    version = 0.1,
    priority = 1003,
    name = plugin_name,
    schema = schema,
}

插件初始化信息包括插件版本,插件优先级,插件名称以及配置信息。

处理逻辑--plugin.access

先获取缓存数据,从上下文信息中根据请求参数构造出用户key,然后将这些数据结合并发数限流配置判断是否开启限流,开启了限流后请求是否在延迟处理范围内:

function _M.increase(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
    if not lim then
        core.log.error("failed to instantiate a resty.limit.conn object: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    local conf_key = conf.key
    local key
    if conf.key_type == "var_combination" then
        local err, n_resolved
        key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var)
        if err then
            core.log.error("could not resolve vars in ", conf_key, " error: ", err)
        end
        

        if n_resolved == 0 then
            key = nil
        end
    else
        key = ctx.var[conf_key]
    end

    if key == nil then
        core.log.info("The value of the configured key is empty, use client IP instead")
        -- When the value of key is empty, use client IP instead
        key = ctx.var["remote_addr"]
    end

    key = key .. ctx.conf_type .. ctx.conf_version
    core.log.info("limit key: ", key)

    local delay, err = lim:incoming(key, true)
    if not delay then
        if err == "rejected" then
            if conf.rejected_msg then
                return conf.rejected_code, { error_msg = conf.rejected_msg }
            end
            return conf.rejected_code or 503
        end

        core.log.error("failed to limit conn: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    if lim:is_committed() then
        if not ctx.limit_conn then
            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
        end

        core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
    end

    if delay >= 0.001 then
        sleep(delay)
    end
end

其中根据并发请求数判断限流开关的主体代码用的是openresty中的limit模块代码lim:incoming(key, true)核心代码如下:

function _M.incoming(self, key, commit)
    local dict = self.dict
    local max = self.max

    self.committed = false

    local conn, err
    if commit then
        conn, err = dict:incr(key, 1, 0)
        if not conn then
            return nil, err
        end

        if conn > max + self.burst then
            conn, err = dict:incr(key, -1)
            if not conn then
                return nil, err
            end
            return nil, "rejected"
        end
        self.committed = true

    else
        conn = (dict:get(key) or 0) + 1
        if conn > max + self.burst then
            return nil, "rejected"
        end
    end

    if conn > max then
        -- make the excessive connections wait
        return self.unit_delay * floor((conn - 1) / max), conn
    end

    -- we return a 0 delay by default
    return 0, conn
end

 

日志处理--并发数修改

function _M.decrease(conf, ctx)
    local limit_conn = ctx.limit_conn
    if not limit_conn then
        return
    end

    for i = 1, #limit_conn, 4 do
        local lim = limit_conn[i]
        local key = limit_conn[i + 1]
        local delay = limit_conn[i + 2]
        local use_delay =  limit_conn[i + 3]

        local latency
        if not use_delay then
            if ctx.proxy_passed then
                latency = ctx.var.upstream_response_time
            else
                latency = ctx.var.request_time - delay
            end
        end
        core.log.debug("request latency is ", latency) -- for test

        local conn, err = lim:leaving(key, latency)
        if not conn then
            core.log.error("failed to record the connection leaving request: ",
                           err)
            break
        end
    end

    core.tablepool.release("plugin#limit-conn", limit_conn)
    ctx.limit_conn = nil
    return
end

其中lim:leaving 用的是openresty中的limit模块代码:

function _M.leaving(self, key, req_latency)
    assert(key)
    local dict = self.dict

    local conn, err = dict:incr(key, -1)
    if not conn then
        return nil, err
    end

    if req_latency then
        local unit_delay = self.unit_delay
        self.unit_delay = (req_latency + unit_delay) / 2
    end

    return conn
end

插件应用

通过dashboard可以直接开启插件,并配置相关数据

0条评论
0 / 1000
w****n
6文章数
0粉丝数
w****n
6 文章 | 0 粉丝
原创

APISIX源码剖析--插件limit-conn

2023-06-14 08:20:58
81
0

APISIX的流量控制插件limit-conn用于限制用户对单个服务的并发请求数,即当用户(依据请求中的一个或者多个变量)对路由的并发请求数达到了一定限制,开启限流策略,返回自定义的状态码和响应信息。

源码实现

初始化

配置初始化包括最大并发请求数,超过并发数后被延迟处理的额外并发请求数,默认处理延迟时间,以及限流请求的应答返回码和应答信息;

local plugin_name = "limit-conn"
local schema = {
    type = "object",
    properties = {
        conn = {type = "integer", exclusiveMinimum = 0},
        burst = {type = "integer",  minimum = 0},
        default_conn_delay = {type = "number", exclusiveMinimum = 0},
        only_use_default_delay = {type = "boolean", default = false},
        key = {type = "string"},
        key_type = {type = "string",
            enum = {"var", "var_combination"},
            default = "var",
        },
        rejected_code = {
            type = "integer", minimum = 200, maximum = 599, default = 503
        },
        rejected_msg = {
            type = "string", minLength = 1
        },
        allow_degradation = {type = "boolean", default = false}
    },
    required = {"conn", "burst", "default_conn_delay", "key"}
}

local _M = {
    version = 0.1,
    priority = 1003,
    name = plugin_name,
    schema = schema,
}

插件初始化信息包括插件版本,插件优先级,插件名称以及配置信息。

处理逻辑--plugin.access

先获取缓存数据,从上下文信息中根据请求参数构造出用户key,然后将这些数据结合并发数限流配置判断是否开启限流,开启了限流后请求是否在延迟处理范围内:

function _M.increase(conf, ctx)
    core.log.info("ver: ", ctx.conf_version)
    local lim, err = lrucache(conf, nil, create_limit_obj, conf)
    if not lim then
        core.log.error("failed to instantiate a resty.limit.conn object: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    local conf_key = conf.key
    local key
    if conf.key_type == "var_combination" then
        local err, n_resolved
        key, err, n_resolved = core.utils.resolve_var(conf_key, ctx.var)
        if err then
            core.log.error("could not resolve vars in ", conf_key, " error: ", err)
        end
        

        if n_resolved == 0 then
            key = nil
        end
    else
        key = ctx.var[conf_key]
    end

    if key == nil then
        core.log.info("The value of the configured key is empty, use client IP instead")
        -- When the value of key is empty, use client IP instead
        key = ctx.var["remote_addr"]
    end

    key = key .. ctx.conf_type .. ctx.conf_version
    core.log.info("limit key: ", key)

    local delay, err = lim:incoming(key, true)
    if not delay then
        if err == "rejected" then
            if conf.rejected_msg then
                return conf.rejected_code, { error_msg = conf.rejected_msg }
            end
            return conf.rejected_code or 503
        end

        core.log.error("failed to limit conn: ", err)
        if conf.allow_degradation then
            return
        end
        return 500
    end

    if lim:is_committed() then
        if not ctx.limit_conn then
            ctx.limit_conn = core.tablepool.fetch("plugin#limit-conn", 0, 6)
        end

        core.table.insert_tail(ctx.limit_conn, lim, key, delay, conf.only_use_default_delay)
    end

    if delay >= 0.001 then
        sleep(delay)
    end
end

其中根据并发请求数判断限流开关的主体代码用的是openresty中的limit模块代码lim:incoming(key, true)核心代码如下:

function _M.incoming(self, key, commit)
    local dict = self.dict
    local max = self.max

    self.committed = false

    local conn, err
    if commit then
        conn, err = dict:incr(key, 1, 0)
        if not conn then
            return nil, err
        end

        if conn > max + self.burst then
            conn, err = dict:incr(key, -1)
            if not conn then
                return nil, err
            end
            return nil, "rejected"
        end
        self.committed = true

    else
        conn = (dict:get(key) or 0) + 1
        if conn > max + self.burst then
            return nil, "rejected"
        end
    end

    if conn > max then
        -- make the excessive connections wait
        return self.unit_delay * floor((conn - 1) / max), conn
    end

    -- we return a 0 delay by default
    return 0, conn
end

 

日志处理--并发数修改

function _M.decrease(conf, ctx)
    local limit_conn = ctx.limit_conn
    if not limit_conn then
        return
    end

    for i = 1, #limit_conn, 4 do
        local lim = limit_conn[i]
        local key = limit_conn[i + 1]
        local delay = limit_conn[i + 2]
        local use_delay =  limit_conn[i + 3]

        local latency
        if not use_delay then
            if ctx.proxy_passed then
                latency = ctx.var.upstream_response_time
            else
                latency = ctx.var.request_time - delay
            end
        end
        core.log.debug("request latency is ", latency) -- for test

        local conn, err = lim:leaving(key, latency)
        if not conn then
            core.log.error("failed to record the connection leaving request: ",
                           err)
            break
        end
    end

    core.tablepool.release("plugin#limit-conn", limit_conn)
    ctx.limit_conn = nil
    return
end

其中lim:leaving 用的是openresty中的limit模块代码:

function _M.leaving(self, key, req_latency)
    assert(key)
    local dict = self.dict

    local conn, err = dict:incr(key, -1)
    if not conn then
        return nil, err
    end

    if req_latency then
        local unit_delay = self.unit_delay
        self.unit_delay = (req_latency + unit_delay) / 2
    end

    return conn
end

插件应用

通过dashboard可以直接开启插件,并配置相关数据

文章来自个人专栏
云原生网关
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0