searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

k8sgpt分析

2023-06-29 09:16:54
402
0

一、概述

1、简介

k8sgpt是一个用于扫描 Kubernetes 集群、诊断和分类问题的工具,它将 SRE 编入其分析器中,通过 AI 驱动进行的问题诊断。k8sgpt中集成了很多分析器,如eventAnalyzer,ingressAnalyzer,cronJobAnalyzer,podAnalyzer,并通过LLM模型对k8s资源的failed事件进行补全、丰富及扩展,提供failed事件可能的错误原因及解决方法。
k8sgpt最大的优势是能够通过LLM模型自动生成针对各failed事件的建议,而不需要开发者针对每个failed事件编写建议集,大大提高了开发效率。但同时,依赖于LLM模型生成建议也可能产生生成的建议不准确或生成时间较长(依赖于模型性能)及需要消耗较多算力的缺点。

 

2、架构

 

二、关键原理分析

2.1 k8sgpt-operator

k8sgpt operator,用于:

  1. 通过cr在k8s集群中创建和管理k8sgpt后端deployment
  2. 调用k8sgpt deployment对集群资源进行分析,并通过LLM模型生成建议
  3. 获取k8sgpt生成的建议后将failed事件和建议进行组装,创建result资源进行记录

 

2.2 k8sgpt-deployment

k8sgpt后端,以deployment方式部署在k8s集群中,能够分析k8s各资源的failed事件,并通过LLM模型对事件进行建议补全。k8sgpt-operator创建的k8sgpt后端默认以server模式部署,提供rpc接口给operator调用及返回分析结果。k8sgpt会把已进行分析的资源放到cache中,防止重复分析造成的性能损失。

支持的分析器:

 

2.3 local-ai

LocalAI是一个可在本地运行的LLM模型推理程序,兼容OpenAI API,相对于openai优势主要在于:

  1. 它允许使用消费级硬件在本地或本地运行LLM
  2. 支持与ggml格式兼容的多个模型系列
  3. 不需要GPU

 

三、核心源码分析

3.1 k8sgpt-operator

  • 创建k8sgpt-deployment相关资源:reconcile开始后,会首先检测集群中是否存在k8sgpt-deployment,如无,则创建对应的资源
    // Check and see if the instance is new or has a K8sGPT deployment in flight
    deployment := v1.Deployment{}
    err = r.Get(ctx, client.ObjectKey{Namespace: k8sgptConfig.Namespace,
        Name: "k8sgpt-deployment"}, &deployment)
    if err != nil {

        err = resources.Sync(ctx, r.Client, *k8sgptConfig, resources.Create)
        if err != nil {
            k8sgptReconcileErrorCount.Inc()
            return r.finishReconcile(err, false)
        }
    }
  • 请求k8sgpt-deployment,获取集群failed事件及LLM生成的建议
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {

    client := rpc.NewServerClient(c.conn)

    req := &schemav1.AnalyzeRequest{
        Explain:  config.Spec.EnableAI,
        Nocache:  config.Spec.NoCache,
        Backend:  string(config.Spec.Backend),
        Filters:  config.Spec.Filters,
        Language: "Chinese",
    }

    res, err := client.Analyze(context.Background(), req)
    if err != nil {
        return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
    }

    var target []v1alpha1.ResultSpec

    jsonBytes, err := json.Marshal(res.Results)
    if err != nil {
        return nil, err
    }

    err = json.Unmarshal(jsonBytes, &target)
    if err != nil {
        return nil, err
    }

    response := &common.K8sGPTReponse{
        Status:   res.Status,
        Results:  target,
        Problems: int(res.Problems),
    }
    return response, nil
}
  • 创建result资源,记录分析结果
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {

    client := rpc.NewServerClient(c.conn)

    req := &schemav1.AnalyzeRequest{
        Explain:  config.Spec.EnableAI,
        Nocache:  config.Spec.NoCache,
        Backend:  string(config.Spec.Backend),
        Filters:  config.Spec.Filters,
        Language: "Chinese",
    }

    res, err := client.Analyze(context.Background(), req)
    if err != nil {
        return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
    }

    var target []v1alpha1.ResultSpec

    jsonBytes, err := json.Marshal(res.Results)
    if err != nil {
        return nil, err
    }

    err = json.Unmarshal(jsonBytes, &target)
    if err != nil {
        return nil, err
    }

    response := &common.K8sGPTReponse{
        Status:   res.Status,
        Results:  target,
        Problems: int(res.Problems),
    }
    return response, nil
}

 

3.2 k8sgpt-deployment

  • 扫描集群资源,使用分析器进行分析,已经分析过的资源放入cache
// Analyze scans all namespaces for Deployments with misconfigurations
func (d DeploymentAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {

    kind := "Deployment"
    apiDoc := kubernetes.K8sApiReference{
        Kind: kind,
        ApiVersion: schema.GroupVersion{
            Group:   "apps",
            Version: "v1",
        },
        OpenapiSchema: a.OpenapiSchema,
    }

    AnalyzerErrorsMetric.DeletePartialMatch(map[string]string{
        "analyzer_name": kind,
    })

    deployments, err := a.Client.GetClient().AppsV1().Deployments(a.Namespace).List(context.Background(), v1.ListOptions{})
    if err != nil {
        return nil, err
    }
    var preAnalysis = map[string]common.PreAnalysis{}

    for _, deployment := range deployments.Items {
        var failures []common.Failure
        if *deployment.Spec.Replicas != deployment.Status.Replicas {
            doc := apiDoc.GetApiDocV2("spec.replicas")

            failures = append(failures, common.Failure{
                Text:          fmt.Sprintf("Deployment %s/%s has %d replicas but %d are available", deployment.Namespace, deployment.Name, *deployment.Spec.Replicas, deployment.Status.Replicas),
                KubernetesDoc: doc,
                Sensitive: []common.Sensitive{
                    {
                        Unmasked: deployment.Namespace,
                        Masked:   util.MaskString(deployment.Namespace),
                    },
                    {
                        Unmasked: deployment.Name,
                        Masked:   util.MaskString(deployment.Name),
                    },
                }})
        }
        if len(failures) > 0 {
            preAnalysis[fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)] = common.PreAnalysis{
                FailureDetails: failures,
                Deployment:     deployment,
            }
            AnalyzerErrorsMetric.WithLabelValues(kind, deployment.Name, deployment.Namespace).Set(float64(len(failures)))
        }

    }

    for key, value := range preAnalysis {
        var currentAnalysis = common.Result{
            Kind:  kind,
            Name:  key,
            Error: value.FailureDetails,
        }

        a.Results = append(a.Results, currentAnalysis)
    }

    return a.Results, nil
}

func (a *OpenAIClient) Parse(ctx context.Context, prompt []string, cache cache.ICache) (string, error) {
    inputKey := strings.Join(prompt, " ")
    // Check for cached data
    cacheKey := util.GetCacheKey(a.GetName(), a.language, inputKey)

    if !cache.IsCacheDisabled() && cache.Exists(cacheKey) {
        response, err := cache.Load(cacheKey)
        if err != nil {
            return "", err
        }

        if response != "" {
            output, err := base64.StdEncoding.DecodeString(response)
            if err != nil {
                color.Red("error decoding cached data: %v", err)
                return "", nil
            }
            return string(output), nil
        }
    }

    response, err := a.GetCompletion(ctx, inputKey)
    if err != nil {
        return "", err
    }

    err = cache.Store(cacheKey, base64.StdEncoding.EncodeToString([]byte(response)))

    if err != nil {
        color.Red("error storing value to cache: %v", err)
        return "", nil
    }

    return response, nil
}

 

  • 调用LLM推理接口补全建议
func (c *OpenAIClient) GetCompletion(ctx context.Context, prompt string) (string, error) {
    // Create a completion request
    resp, err := c.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model: c.model,
        Messages: []openai.ChatCompletionMessage{
            {
                Role:    "user",
                Content: fmt.Sprintf(default_prompt, c.language, prompt),
            },
        },
    })
    if err != nil {
        return "", err
    }
    return resp.Choices[0].Message.Content, nil
}

default_prompt = `Simplify the following Kubernetes error message delimited by triple dashes written in --- %s --- language; --- %s ---.
Provide the most possible solution in a step by step style in no more than 280 characters. Write the output in the following format:
Error: {Explain error here}
Solution: {Step by step solution here}

 

四、实际运行效果

如图所示,k8sgpt扫描了集群所有failed事件并生成事件根因及建议,operator把分析结果持久化为result资源,应用可以通过k8s api查询分析结果

0条评论
0 / 1000
覃****枫
2文章数
0粉丝数
覃****枫
2 文章 | 0 粉丝
覃****枫
2文章数
0粉丝数
覃****枫
2 文章 | 0 粉丝
原创

k8sgpt分析

2023-06-29 09:16:54
402
0

一、概述

1、简介

k8sgpt是一个用于扫描 Kubernetes 集群、诊断和分类问题的工具,它将 SRE 编入其分析器中,通过 AI 驱动进行的问题诊断。k8sgpt中集成了很多分析器,如eventAnalyzer,ingressAnalyzer,cronJobAnalyzer,podAnalyzer,并通过LLM模型对k8s资源的failed事件进行补全、丰富及扩展,提供failed事件可能的错误原因及解决方法。
k8sgpt最大的优势是能够通过LLM模型自动生成针对各failed事件的建议,而不需要开发者针对每个failed事件编写建议集,大大提高了开发效率。但同时,依赖于LLM模型生成建议也可能产生生成的建议不准确或生成时间较长(依赖于模型性能)及需要消耗较多算力的缺点。

 

2、架构

 

二、关键原理分析

2.1 k8sgpt-operator

k8sgpt operator,用于:

  1. 通过cr在k8s集群中创建和管理k8sgpt后端deployment
  2. 调用k8sgpt deployment对集群资源进行分析,并通过LLM模型生成建议
  3. 获取k8sgpt生成的建议后将failed事件和建议进行组装,创建result资源进行记录

 

2.2 k8sgpt-deployment

k8sgpt后端,以deployment方式部署在k8s集群中,能够分析k8s各资源的failed事件,并通过LLM模型对事件进行建议补全。k8sgpt-operator创建的k8sgpt后端默认以server模式部署,提供rpc接口给operator调用及返回分析结果。k8sgpt会把已进行分析的资源放到cache中,防止重复分析造成的性能损失。

支持的分析器:

 

2.3 local-ai

LocalAI是一个可在本地运行的LLM模型推理程序,兼容OpenAI API,相对于openai优势主要在于:

  1. 它允许使用消费级硬件在本地或本地运行LLM
  2. 支持与ggml格式兼容的多个模型系列
  3. 不需要GPU

 

三、核心源码分析

3.1 k8sgpt-operator

  • 创建k8sgpt-deployment相关资源:reconcile开始后,会首先检测集群中是否存在k8sgpt-deployment,如无,则创建对应的资源
    // Check and see if the instance is new or has a K8sGPT deployment in flight
    deployment := v1.Deployment{}
    err = r.Get(ctx, client.ObjectKey{Namespace: k8sgptConfig.Namespace,
        Name: "k8sgpt-deployment"}, &deployment)
    if err != nil {

        err = resources.Sync(ctx, r.Client, *k8sgptConfig, resources.Create)
        if err != nil {
            k8sgptReconcileErrorCount.Inc()
            return r.finishReconcile(err, false)
        }
    }
  • 请求k8sgpt-deployment,获取集群failed事件及LLM生成的建议
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {

    client := rpc.NewServerClient(c.conn)

    req := &schemav1.AnalyzeRequest{
        Explain:  config.Spec.EnableAI,
        Nocache:  config.Spec.NoCache,
        Backend:  string(config.Spec.Backend),
        Filters:  config.Spec.Filters,
        Language: "Chinese",
    }

    res, err := client.Analyze(context.Background(), req)
    if err != nil {
        return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
    }

    var target []v1alpha1.ResultSpec

    jsonBytes, err := json.Marshal(res.Results)
    if err != nil {
        return nil, err
    }

    err = json.Unmarshal(jsonBytes, &target)
    if err != nil {
        return nil, err
    }

    response := &common.K8sGPTReponse{
        Status:   res.Status,
        Results:  target,
        Problems: int(res.Problems),
    }
    return response, nil
}
  • 创建result资源,记录分析结果
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {

    client := rpc.NewServerClient(c.conn)

    req := &schemav1.AnalyzeRequest{
        Explain:  config.Spec.EnableAI,
        Nocache:  config.Spec.NoCache,
        Backend:  string(config.Spec.Backend),
        Filters:  config.Spec.Filters,
        Language: "Chinese",
    }

    res, err := client.Analyze(context.Background(), req)
    if err != nil {
        return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
    }

    var target []v1alpha1.ResultSpec

    jsonBytes, err := json.Marshal(res.Results)
    if err != nil {
        return nil, err
    }

    err = json.Unmarshal(jsonBytes, &target)
    if err != nil {
        return nil, err
    }

    response := &common.K8sGPTReponse{
        Status:   res.Status,
        Results:  target,
        Problems: int(res.Problems),
    }
    return response, nil
}

 

3.2 k8sgpt-deployment

  • 扫描集群资源,使用分析器进行分析,已经分析过的资源放入cache
// Analyze scans all namespaces for Deployments with misconfigurations
func (d DeploymentAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {

    kind := "Deployment"
    apiDoc := kubernetes.K8sApiReference{
        Kind: kind,
        ApiVersion: schema.GroupVersion{
            Group:   "apps",
            Version: "v1",
        },
        OpenapiSchema: a.OpenapiSchema,
    }

    AnalyzerErrorsMetric.DeletePartialMatch(map[string]string{
        "analyzer_name": kind,
    })

    deployments, err := a.Client.GetClient().AppsV1().Deployments(a.Namespace).List(context.Background(), v1.ListOptions{})
    if err != nil {
        return nil, err
    }
    var preAnalysis = map[string]common.PreAnalysis{}

    for _, deployment := range deployments.Items {
        var failures []common.Failure
        if *deployment.Spec.Replicas != deployment.Status.Replicas {
            doc := apiDoc.GetApiDocV2("spec.replicas")

            failures = append(failures, common.Failure{
                Text:          fmt.Sprintf("Deployment %s/%s has %d replicas but %d are available", deployment.Namespace, deployment.Name, *deployment.Spec.Replicas, deployment.Status.Replicas),
                KubernetesDoc: doc,
                Sensitive: []common.Sensitive{
                    {
                        Unmasked: deployment.Namespace,
                        Masked:   util.MaskString(deployment.Namespace),
                    },
                    {
                        Unmasked: deployment.Name,
                        Masked:   util.MaskString(deployment.Name),
                    },
                }})
        }
        if len(failures) > 0 {
            preAnalysis[fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)] = common.PreAnalysis{
                FailureDetails: failures,
                Deployment:     deployment,
            }
            AnalyzerErrorsMetric.WithLabelValues(kind, deployment.Name, deployment.Namespace).Set(float64(len(failures)))
        }

    }

    for key, value := range preAnalysis {
        var currentAnalysis = common.Result{
            Kind:  kind,
            Name:  key,
            Error: value.FailureDetails,
        }

        a.Results = append(a.Results, currentAnalysis)
    }

    return a.Results, nil
}

func (a *OpenAIClient) Parse(ctx context.Context, prompt []string, cache cache.ICache) (string, error) {
    inputKey := strings.Join(prompt, " ")
    // Check for cached data
    cacheKey := util.GetCacheKey(a.GetName(), a.language, inputKey)

    if !cache.IsCacheDisabled() && cache.Exists(cacheKey) {
        response, err := cache.Load(cacheKey)
        if err != nil {
            return "", err
        }

        if response != "" {
            output, err := base64.StdEncoding.DecodeString(response)
            if err != nil {
                color.Red("error decoding cached data: %v", err)
                return "", nil
            }
            return string(output), nil
        }
    }

    response, err := a.GetCompletion(ctx, inputKey)
    if err != nil {
        return "", err
    }

    err = cache.Store(cacheKey, base64.StdEncoding.EncodeToString([]byte(response)))

    if err != nil {
        color.Red("error storing value to cache: %v", err)
        return "", nil
    }

    return response, nil
}

 

  • 调用LLM推理接口补全建议
func (c *OpenAIClient) GetCompletion(ctx context.Context, prompt string) (string, error) {
    // Create a completion request
    resp, err := c.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
        Model: c.model,
        Messages: []openai.ChatCompletionMessage{
            {
                Role:    "user",
                Content: fmt.Sprintf(default_prompt, c.language, prompt),
            },
        },
    })
    if err != nil {
        return "", err
    }
    return resp.Choices[0].Message.Content, nil
}

default_prompt = `Simplify the following Kubernetes error message delimited by triple dashes written in --- %s --- language; --- %s ---.
Provide the most possible solution in a step by step style in no more than 280 characters. Write the output in the following format:
Error: {Explain error here}
Solution: {Step by step solution here}

 

四、实际运行效果

如图所示,k8sgpt扫描了集群所有failed事件并生成事件根因及建议,operator把分析结果持久化为result资源,应用可以通过k8s api查询分析结果

文章来自个人专栏
k8s技术
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0