一、概述
1、简介
k8sgpt是一个用于扫描 Kubernetes 集群、诊断和分类问题的工具,它将 SRE 编入其分析器中,通过 AI 驱动进行的问题诊断。k8sgpt中集成了很多分析器,如eventAnalyzer,ingressAnalyzer,cronJobAnalyzer,podAnalyzer,并通过LLM模型对k8s资源的failed事件进行补全、丰富及扩展,提供failed事件可能的错误原因及解决方法。
k8sgpt最大的优势是能够通过LLM模型自动生成针对各failed事件的建议,而不需要开发者针对每个failed事件编写建议集,大大提高了开发效率。但同时,依赖于LLM模型生成建议也可能产生生成的建议不准确或生成时间较长(依赖于模型性能)及需要消耗较多算力的缺点。
2、架构
二、关键原理分析
2.1 k8sgpt-operator
k8sgpt operator,用于:
- 通过cr在k8s集群中创建和管理k8sgpt后端deployment
- 调用k8sgpt deployment对集群资源进行分析,并通过LLM模型生成建议
- 获取k8sgpt生成的建议后将failed事件和建议进行组装,创建result资源进行记录
2.2 k8sgpt-deployment
k8sgpt后端,以deployment方式部署在k8s集群中,能够分析k8s各资源的failed事件,并通过LLM模型对事件进行建议补全。k8sgpt-operator创建的k8sgpt后端默认以server模式部署,提供rpc接口给operator调用及返回分析结果。k8sgpt会把已进行分析的资源放到cache中,防止重复分析造成的性能损失。
支持的分析器:
2.3 local-ai
LocalAI是一个可在本地运行的LLM模型推理程序,兼容OpenAI API,相对于openai优势主要在于:
- 它允许使用消费级硬件在本地或本地运行LLM
- 支持与ggml格式兼容的多个模型系列
- 不需要GPU
三、核心源码分析
3.1 k8sgpt-operator
- 创建k8sgpt-deployment相关资源:reconcile开始后,会首先检测集群中是否存在k8sgpt-deployment,如无,则创建对应的资源
// Check and see if the instance is new or has a K8sGPT deployment in flight
deployment := v1.Deployment{}
err = r.Get(ctx, client.ObjectKey{Namespace: k8sgptConfig.Namespace,
Name: "k8sgpt-deployment"}, &deployment)
if err != nil {
err = resources.Sync(ctx, r.Client, *k8sgptConfig, resources.Create)
if err != nil {
k8sgptReconcileErrorCount.Inc()
return r.finishReconcile(err, false)
}
}
- 请求k8sgpt-deployment,获取集群failed事件及LLM生成的建议
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {
client := rpc.NewServerClient(c.conn)
req := &schemav1.AnalyzeRequest{
Explain: config.Spec.EnableAI,
Nocache: config.Spec.NoCache,
Backend: string(config.Spec.Backend),
Filters: config.Spec.Filters,
Language: "Chinese",
}
res, err := client.Analyze(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
}
var target []v1alpha1.ResultSpec
jsonBytes, err := json.Marshal(res.Results)
if err != nil {
return nil, err
}
err = json.Unmarshal(jsonBytes, &target)
if err != nil {
return nil, err
}
response := &common.K8sGPTReponse{
Status: res.Status,
Results: target,
Problems: int(res.Problems),
}
return response, nil
}
- 创建result资源,记录分析结果
func (c *Client) ProcessAnalysis(deployment v1.Deployment, config *v1alpha1.K8sGPT) (*common.K8sGPTReponse, error) {
client := rpc.NewServerClient(c.conn)
req := &schemav1.AnalyzeRequest{
Explain: config.Spec.EnableAI,
Nocache: config.Spec.NoCache,
Backend: string(config.Spec.Backend),
Filters: config.Spec.Filters,
Language: "Chinese",
}
res, err := client.Analyze(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("failed to call Analyze RPC: %v", err)
}
var target []v1alpha1.ResultSpec
jsonBytes, err := json.Marshal(res.Results)
if err != nil {
return nil, err
}
err = json.Unmarshal(jsonBytes, &target)
if err != nil {
return nil, err
}
response := &common.K8sGPTReponse{
Status: res.Status,
Results: target,
Problems: int(res.Problems),
}
return response, nil
}
3.2 k8sgpt-deployment
- 扫描集群资源,使用分析器进行分析,已经分析过的资源放入cache
// Analyze scans all namespaces for Deployments with misconfigurations
func (d DeploymentAnalyzer) Analyze(a common.Analyzer) ([]common.Result, error) {
kind := "Deployment"
apiDoc := kubernetes.K8sApiReference{
Kind: kind,
ApiVersion: schema.GroupVersion{
Group: "apps",
Version: "v1",
},
OpenapiSchema: a.OpenapiSchema,
}
AnalyzerErrorsMetric.DeletePartialMatch(map[string]string{
"analyzer_name": kind,
})
deployments, err := a.Client.GetClient().AppsV1().Deployments(a.Namespace).List(context.Background(), v1.ListOptions{})
if err != nil {
return nil, err
}
var preAnalysis = map[string]common.PreAnalysis{}
for _, deployment := range deployments.Items {
var failures []common.Failure
if *deployment.Spec.Replicas != deployment.Status.Replicas {
doc := apiDoc.GetApiDocV2("spec.replicas")
failures = append(failures, common.Failure{
Text: fmt.Sprintf("Deployment %s/%s has %d replicas but %d are available", deployment.Namespace, deployment.Name, *deployment.Spec.Replicas, deployment.Status.Replicas),
KubernetesDoc: doc,
Sensitive: []common.Sensitive{
{
Unmasked: deployment.Namespace,
Masked: util.MaskString(deployment.Namespace),
},
{
Unmasked: deployment.Name,
Masked: util.MaskString(deployment.Name),
},
}})
}
if len(failures) > 0 {
preAnalysis[fmt.Sprintf("%s/%s", deployment.Namespace, deployment.Name)] = common.PreAnalysis{
FailureDetails: failures,
Deployment: deployment,
}
AnalyzerErrorsMetric.WithLabelValues(kind, deployment.Name, deployment.Namespace).Set(float64(len(failures)))
}
}
for key, value := range preAnalysis {
var currentAnalysis = common.Result{
Kind: kind,
Name: key,
Error: value.FailureDetails,
}
a.Results = append(a.Results, currentAnalysis)
}
return a.Results, nil
}
func (a *OpenAIClient) Parse(ctx context.Context, prompt []string, cache cache.ICache) (string, error) {
inputKey := strings.Join(prompt, " ")
// Check for cached data
cacheKey := util.GetCacheKey(a.GetName(), a.language, inputKey)
if !cache.IsCacheDisabled() && cache.Exists(cacheKey) {
response, err := cache.Load(cacheKey)
if err != nil {
return "", err
}
if response != "" {
output, err := base64.StdEncoding.DecodeString(response)
if err != nil {
color.Red("error decoding cached data: %v", err)
return "", nil
}
return string(output), nil
}
}
response, err := a.GetCompletion(ctx, inputKey)
if err != nil {
return "", err
}
err = cache.Store(cacheKey, base64.StdEncoding.EncodeToString([]byte(response)))
if err != nil {
color.Red("error storing value to cache: %v", err)
return "", nil
}
return response, nil
}
- 调用LLM推理接口补全建议
func (c *OpenAIClient) GetCompletion(ctx context.Context, prompt string) (string, error) {
// Create a completion request
resp, err := c.client.CreateChatCompletion(ctx, openai.ChatCompletionRequest{
Model: c.model,
Messages: []openai.ChatCompletionMessage{
{
Role: "user",
Content: fmt.Sprintf(default_prompt, c.language, prompt),
},
},
})
if err != nil {
return "", err
}
return resp.Choices[0].Message.Content, nil
}
default_prompt = `Simplify the following Kubernetes error message delimited by triple dashes written in --- %s --- language; --- %s ---.
Provide the most possible solution in a step by step style in no more than 280 characters. Write the output in the following format:
Error: {Explain error here}
Solution: {Step by step solution here}
四、实际运行效果
如图所示,k8sgpt扫描了集群所有failed事件并生成事件根因及建议,operator把分析结果持久化为result资源,应用可以通过k8s api查询分析结果