searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

APISIX源码剖析-熔断插件api-breaker

2023-06-14 03:32:34
100
0

APISIX的api-breaker插件,实现API熔断逻辑,从而保护上游服务。即通过代码逻辑自动触发健康/不健康状态的次数递增运算,根据熔断配置参数判断是否开启/结束熔断策略。

  • 当上游服务返回 不健康配置中的状态码,并达到配置的不健康最大次数(单位时间内),则认为上游服务处于不健康状态,开启熔断

  • 第一次触发不健康状态时,熔断 2 秒。超过熔断时间后,将重新开始转发请求到上游服务,如果继续返回不健康状态码,记数再次达到配置次数时,熔断 4 秒。依次类推(2,4,8,16,……),直到达到设置的最大熔断值。

  • 当上游服务处于不健康状态时,如果转发请求到上游服务并返回配置中的健康状态码(默认为 200),并达到配置的次数时,则认为上游服务恢复至健康状态,结束熔断。

代码实现

初始化配置

主要是熔断配置初始化,包括熔断反馈信息(包括应答码,应答header,应答内容),熔断策略配置(熔断最长持续时间,上游状态码健康判断等)

local schema = {
    type = "object",
    properties = {
        break_response_code = {
            type = "integer",
            minimum = 200,
            maximum = 599,
        },
        break_response_body = {
            type = "string"
        },
        break_response_headers = {
            type = "array",
            items = {
                type = "object",
                properties = {
                    key = {
                        type = "string",
                        minLength = 1
                    },
                    value = {
                        type = "string",
                        minLength = 1
                    }
                },
                required = {"key", "value"},
            }
        },
        max_breaker_sec = {
            type = "integer",
            minimum = 3,
            default = 300,
        },
        unhealthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 500,
                        maximum = 599,
                    },
                    uniqueItems = true,
                    default = {500}
                },
                failures = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {500}, failures = 3}
        },
        healthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 200,
                        maximum = 499,
                    },
                    uniqueItems = true,
                    default = {200}
                },
                successes = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {200}, successes = 3}
        }
    },
    required = {"break_response_code"},
}

处理逻辑

处理逻辑根据历史统计的上游健康数据判断上游服务是否开启熔断,该请求是否可以正常处理;即判断当前是否在熔断期间,并且计算熔断时间以及判断是否超出最大熔断时间范围,如果是则添加熔断信息数据返回应答; 否则请求可以正常处理。

function _M.access(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    -- unhealthy counts
    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to get unhealthy_key: ",
                      unhealthy_key, " err: ", err)
        return
    end

    if not unhealthy_count then
        return
    end

    -- timestamp of the last time a unhealthy state was triggered
    local lasttime_key = gen_lasttime_key(ctx)
    local lasttime, err = shared_buffer:get(lasttime_key)
    if err then
        core.log.warn("failed to get lasttime_key: ",
                      lasttime_key, " err: ", err)
        return
    end

    if not lasttime then
        return
    end

    local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
    if failure_times < 1 then
        failure_times = 1
    end

    -- cannot exceed the maximum value of the user configuration
    local breaker_time = 2 ^ failure_times
    if breaker_time > conf.max_breaker_sec then
        breaker_time = conf.max_breaker_sec
    end
    core.log.info("breaker_time: ", breaker_time)

    -- breaker
    if lasttime + breaker_time >= ngx.time() then
        if conf.break_response_body then
            if conf.break_response_headers then
                for _, value in ipairs(conf.break_response_headers) do
                    local val = core.utils.resolve_var(value.value, ctx.var)
                    core.response.add_header(value.key, val)
                end
            end
            return conf.break_response_code, conf.break_response_body
        end
        return conf.break_response_code
    end

    return
end

日志处理--健康状态统计逻辑

健康与否的数据统计是在日志模块里面调用实现的,根据上下文信息的内容判断请求是否是健康状态,如果是不健康状态,则需要进行不健康状态累计,并根据配置判断是否开启熔断; 如果是健康状态则进行健康累加计算后,根据配置判断是否需要重置下游服务的正常状态,结束熔断;即这段代码主要关系熔断状态开启和结束的处理。

function _M.log(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    local healthy_key = gen_healthy_key(ctx)
    local upstream_status = core.response.get_upstream_status(ctx)

    if not upstream_status then
        return
    end

    -- unhealthy process
    if core.table.array_find(conf.unhealthy.http_statuses,
                             upstream_status)
    then
        local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
        if err then
            core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
                          " err: ", err)
        end
        core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
                      unhealthy_count)

        shared_buffer:delete(healthy_key)

        -- whether the user-configured number of failures has been reached,
        -- and if so, the timestamp for entering the unhealthy state.
        if unhealthy_count % conf.unhealthy.failures == 0 then
            shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
                              conf.max_breaker_sec)
            core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
                          unhealthy_count)
        end

        return
    end

    -- health process
    if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
        return
    end

    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
                      " err: ", err)
    end

    if not unhealthy_count then
        return
    end

    local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
    if err then
        core.log.warn("failed to `incr` healthy_key: ", healthy_key,
                      " err: ", err)
    end

    -- clear related status
    if healthy_count >= conf.healthy.successes then
        -- stat change to normal
        core.log.info("change to normal, ", healthy_key, " ", healthy_count)
        shared_buffer:delete(gen_lasttime_key(ctx))
        shared_buffer:delete(unhealthy_key)
        shared_buffer:delete(healthy_key)
    end

    return
end

插件应用

可以通过dashboard进行界面设置,并开启熔断插件

0条评论
0 / 1000
w****n
6文章数
0粉丝数
w****n
6 文章 | 0 粉丝
原创

APISIX源码剖析-熔断插件api-breaker

2023-06-14 03:32:34
100
0

APISIX的api-breaker插件,实现API熔断逻辑,从而保护上游服务。即通过代码逻辑自动触发健康/不健康状态的次数递增运算,根据熔断配置参数判断是否开启/结束熔断策略。

  • 当上游服务返回 不健康配置中的状态码,并达到配置的不健康最大次数(单位时间内),则认为上游服务处于不健康状态,开启熔断

  • 第一次触发不健康状态时,熔断 2 秒。超过熔断时间后,将重新开始转发请求到上游服务,如果继续返回不健康状态码,记数再次达到配置次数时,熔断 4 秒。依次类推(2,4,8,16,……),直到达到设置的最大熔断值。

  • 当上游服务处于不健康状态时,如果转发请求到上游服务并返回配置中的健康状态码(默认为 200),并达到配置的次数时,则认为上游服务恢复至健康状态,结束熔断。

代码实现

初始化配置

主要是熔断配置初始化,包括熔断反馈信息(包括应答码,应答header,应答内容),熔断策略配置(熔断最长持续时间,上游状态码健康判断等)

local schema = {
    type = "object",
    properties = {
        break_response_code = {
            type = "integer",
            minimum = 200,
            maximum = 599,
        },
        break_response_body = {
            type = "string"
        },
        break_response_headers = {
            type = "array",
            items = {
                type = "object",
                properties = {
                    key = {
                        type = "string",
                        minLength = 1
                    },
                    value = {
                        type = "string",
                        minLength = 1
                    }
                },
                required = {"key", "value"},
            }
        },
        max_breaker_sec = {
            type = "integer",
            minimum = 3,
            default = 300,
        },
        unhealthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 500,
                        maximum = 599,
                    },
                    uniqueItems = true,
                    default = {500}
                },
                failures = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {500}, failures = 3}
        },
        healthy = {
            type = "object",
            properties = {
                http_statuses = {
                    type = "array",
                    minItems = 1,
                    items = {
                        type = "integer",
                        minimum = 200,
                        maximum = 499,
                    },
                    uniqueItems = true,
                    default = {200}
                },
                successes = {
                    type = "integer",
                    minimum = 1,
                    default = 3,
                }
            },
            default = {http_statuses = {200}, successes = 3}
        }
    },
    required = {"break_response_code"},
}

处理逻辑

处理逻辑根据历史统计的上游健康数据判断上游服务是否开启熔断,该请求是否可以正常处理;即判断当前是否在熔断期间,并且计算熔断时间以及判断是否超出最大熔断时间范围,如果是则添加熔断信息数据返回应答; 否则请求可以正常处理。

function _M.access(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    -- unhealthy counts
    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to get unhealthy_key: ",
                      unhealthy_key, " err: ", err)
        return
    end

    if not unhealthy_count then
        return
    end

    -- timestamp of the last time a unhealthy state was triggered
    local lasttime_key = gen_lasttime_key(ctx)
    local lasttime, err = shared_buffer:get(lasttime_key)
    if err then
        core.log.warn("failed to get lasttime_key: ",
                      lasttime_key, " err: ", err)
        return
    end

    if not lasttime then
        return
    end

    local failure_times = math.ceil(unhealthy_count / conf.unhealthy.failures)
    if failure_times < 1 then
        failure_times = 1
    end

    -- cannot exceed the maximum value of the user configuration
    local breaker_time = 2 ^ failure_times
    if breaker_time > conf.max_breaker_sec then
        breaker_time = conf.max_breaker_sec
    end
    core.log.info("breaker_time: ", breaker_time)

    -- breaker
    if lasttime + breaker_time >= ngx.time() then
        if conf.break_response_body then
            if conf.break_response_headers then
                for _, value in ipairs(conf.break_response_headers) do
                    local val = core.utils.resolve_var(value.value, ctx.var)
                    core.response.add_header(value.key, val)
                end
            end
            return conf.break_response_code, conf.break_response_body
        end
        return conf.break_response_code
    end

    return
end

日志处理--健康状态统计逻辑

健康与否的数据统计是在日志模块里面调用实现的,根据上下文信息的内容判断请求是否是健康状态,如果是不健康状态,则需要进行不健康状态累计,并根据配置判断是否开启熔断; 如果是健康状态则进行健康累加计算后,根据配置判断是否需要重置下游服务的正常状态,结束熔断;即这段代码主要关系熔断状态开启和结束的处理。

function _M.log(conf, ctx)
    local unhealthy_key = gen_unhealthy_key(ctx)
    local healthy_key = gen_healthy_key(ctx)
    local upstream_status = core.response.get_upstream_status(ctx)

    if not upstream_status then
        return
    end

    -- unhealthy process
    if core.table.array_find(conf.unhealthy.http_statuses,
                             upstream_status)
    then
        local unhealthy_count, err = shared_buffer:incr(unhealthy_key, 1, 0)
        if err then
            core.log.warn("failed to incr unhealthy_key: ", unhealthy_key,
                          " err: ", err)
        end
        core.log.info("unhealthy_key: ", unhealthy_key, " count: ",
                      unhealthy_count)

        shared_buffer:delete(healthy_key)

        -- whether the user-configured number of failures has been reached,
        -- and if so, the timestamp for entering the unhealthy state.
        if unhealthy_count % conf.unhealthy.failures == 0 then
            shared_buffer:set(gen_lasttime_key(ctx), ngx.time(),
                              conf.max_breaker_sec)
            core.log.info("update unhealthy_key: ", unhealthy_key, " to ",
                          unhealthy_count)
        end

        return
    end

    -- health process
    if not core.table.array_find(conf.healthy.http_statuses, upstream_status) then
        return
    end

    local unhealthy_count, err = shared_buffer:get(unhealthy_key)
    if err then
        core.log.warn("failed to `get` unhealthy_key: ", unhealthy_key,
                      " err: ", err)
    end

    if not unhealthy_count then
        return
    end

    local healthy_count, err = shared_buffer:incr(healthy_key, 1, 0)
    if err then
        core.log.warn("failed to `incr` healthy_key: ", healthy_key,
                      " err: ", err)
    end

    -- clear related status
    if healthy_count >= conf.healthy.successes then
        -- stat change to normal
        core.log.info("change to normal, ", healthy_key, " ", healthy_count)
        shared_buffer:delete(gen_lasttime_key(ctx))
        shared_buffer:delete(unhealthy_key)
        shared_buffer:delete(healthy_key)
    end

    return
end

插件应用

可以通过dashboard进行界面设置,并开启熔断插件

文章来自个人专栏
云原生网关
6 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0