1. 前言
安装使用Go SDK可以帮助开发者快速接入使用天翼云的日志服务相关功能,目前支持同步上传,异步批量上传等功能。
2. 使用条件
2.1. 先决条件
用户需要具备以下条件才能够使用云日志服务 SDK Go版本:
1、已开通云日志服务,并创建了日志项目和日志单元,获取到相应编码(logProject、logUnit)。
2、已获取AccessKey 和 SecretKey。
3、已安装Go开发环境,推荐安装Go1.19或以上版本。
2.2. 下载及安装
下载ctyun_lts_go_sdk.zip压缩包,放到相应位置后并解压。“ctyun_lts_go_sdk”目录中“example”为SDK的使用示例代码。
1、将解压后的代码包的"lts"目录整个复制到您的项目中。
2、更新依赖项,在您的go项目中执行命令:
go mod tidy
3、当您的代码中要使用SDK时,在代码的import中,添加lts包。假设您的项目为lts_sdk_go_demo,则进行如下操作。这样您就可以顺利使用SDK的功能:
import(
"lts_sdk_go_demo/lts"
)
如果您想直接使用SDK,则可以进行如下步骤处理:
4、进入到ctyun_lts_go_sdk 目录下,删除go.mod 、go.sum文件。
5、执行命令,初始化您的项目:
go mod init ctyun_lts_go_sdk
6、接着执行命令:
go mod tidy
7、进入example目录下,运行sample_putlogs.go示例
go run sample_putlogs.go
8、或者进入example目录下,构建您的go项目,执行命令,以生成二进制文件。
go build -o sample_putlogs sample_putlogs.go (linux 环境)
go build -o sample_putlogs.exe sample_putlogs.go (windows 环境)
9、运行sample_putlogs示例:
./sample_putlogs (linux 环境)
sample_putlogs.exe (windows 环境)
3. SDK基本使用
3.1. 基本使用
使用 SDK访问云日志服务,需要设置正确的 AccessKey、SecretKey 和endpoint,所有的服务可以使用同一 key 凭证来进行访问,但不同的服务地区需要使用不同的 endpoint 进行访问,详情参考天翼云官网-SDK接入概述。在调用前SDK,需要已知以下参数:
- 云日志服务访问地址。详情请查看访问地址(Endpoint)。
- key凭证:accessKey和secretKey 。详情请查看如何获取访问密钥(AK/SK)。
- 日志项目编码:logProject,在使用SDK前,需要确保您有至少一个已经存在的日志项目。
- 日志单元编码:logUnit,在使用SDK前,需要确保日志项目中有至少一个已经存在的日志单元。
- 待上传的日志:logItem,在使用SDK前,需要确保日志已按照特定格式组织。
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | 域名 | 是 |
accessKey | string | AccessKey,简称ak | 是 |
secretKey | string | SecretKey ,简称sk | 是 |
logProject | string | 日志项目编码 | 是 |
logUnit | string | 日志单元编码 | 是 |
目前通过SDK将日志上传有两种上传形式:同步上传和异步批量上传。
1、同步上传:当调用日志上传接口时,sdk会立即进行http请求调用,并返回发送结果。这种方式结构简单,可用于发送频率不高的场景。
2、异步批量上传:当调用日志上传接口时,后台线程会将日志进行累积,当达到发送条件时,会进行一次合并发送。对于需要频繁调用发送接口的场景,这种方式性能更卓越,更高效。
示例代码:同步上传
import (
"ctyun_lts_go_sdk/lts"
"fmt"
"time"
)
func main() {
ak := "your accessKey"
sk := "your secretKey"
logProject := "log project Code"
logUnit := "log unit Code"
endpoint := "https://guizhou-lts.ctyun.cn"
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
client, err := lts.CreateClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
logTimestamp := time.Now().UnixNano()
originMsg := "go sdk test oriMessage"
contents := make(map[string]any)
contents["contentInt"] = logTimestamp
contents["contentString"] = "contents test string"
contents["contentDouble"] = 3.1415926
contents["contentBool"] = "true"
labels := make(map[string]any)
labels["user_tag"] = "string"
logItem := lts.GenerateLogItem(logTimestamp, originMsg, contents, labels)
logItems := lts.NewLogItems()
for i := 0; i < 10; i++ {
logItems.AddLogItem(logItem)
}
for j := 0; j < 100; j++ {
response, err := client.PutLogs(logProject, logUnit, logItems)
if err != nil {
fmt.Println("Put log failed: ", err)
} else {
fmt.Println("Response message: ", response.Message)
}
}}
示例代码:异步批量上传
func main() {
ak := "your accessKey"
sk := "your secretKey"
logProject := "log project Code"
logUnit := "log unit code"
endpoint := "your endpoint"
producerConfig := lts.GetDefaultProducerConfig()
producerConfig.AllowLogLevel = "warn"
producer := lts.InitProducer(producerConfig)
// 可以根据clientConfig 创建多个client,client间互不影响
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
err := producer.BuildClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
producer.Start()
var m sync.WaitGroup
for i := 0; i < 3; i++ {
m.Add(1)
go func() {
defer m.Done()
for i := 0; i < 5; i++ {
log := GenerateLog()
logList := GenerateLogList()
//不接收发送结果,无回调
err = producer.SendLogs(logProject, logUnit, log)
// 接收发送结果,回调结果
//err = producer.SendLogListWithCallBack(logProject, logUnit, log)
if err != nil {
fmt.Println(err)
}
}
}()}
m.Wait()
time.Sleep(10 * time.Second)
producer.SafeClose()
}
4. 服务代码示例-同步上传
4.1. 关于Client的操作
4.1.1. GetDefaultClientConfig()
此操作是获取Client的默认配置。至少需要4个关键的参数,config配置如下:
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
Endpoint | string | 域名 | 是 |
AccessKey | string | AccessKey,简称ak | 是 |
SecretKey | string | SecretKey ,简称sk | 是 |
LogProject | string | LogProject,日志项目编码 | 是 |
UserAgent | string | lts-sdk-go/{go版本信息} | 否 |
RequestTimeOut | time.Duration | 请求超时时间,默认60s | 否 |
RetryTimeOut | time.Duration | 重试超时时间,默认90s | 否 |
CompressType | string | 日志压缩方式,默认lz4 | 否 |
示例代码:获取ClinetConfig配置
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
//也可以进行自定义参数值
clientConfig.RequestTimeOut=30 * time.Second
clientConfig.UserAgent=”log-sdk-go/1.6.0’
4.1.2. CreateClient
此操作是根据clientConfig创建一个Client。之后,可以进行Client初始化,其中有三个主要步骤:初始化client示例、初始化HTTPClient用于发送http请求、通过accessKey和secretKey获取临时凭证Token,关于HTTPClient中的参数,用户也可以根据项目的需要自行修改。
示例代码:初始化生成Client
//初始化 Client
client, err := lts.CreateClient(clientConfig)
//CreateClient()的实现
func CreateClient(clientConfig *ClientConfig) (*Client, error) {
client := &Client{
Endpoint: clientConfig.Endpoint,
AccessKey: clientConfig.AccessKey,
SecretKey: clientConfig.SecretKey,
CompressType: clientConfig.CompressType,
RequestTimeOut: clientConfig.RequestTimeOut,
RetryTimeOut: clientConfig.RetryTimeOut,
UserAgent: clientConfig.UserAgent,
HTTPClient: &http.Client{
Timeout: 30 * time.Second,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 90 * time.Second,
}).DialContext,
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
IdleConnTimeout: 90 * time.Second,
DisableKeepAlives: false,
},
},
}
if err := CheckConfig(client); err != nil {
return nil, err
}
err := client.SetToken()
if err != nil {
return nil, err
}
return client, nil
}
4.2. 关于临时凭证Token的操作
4.2.1. akskToToken()
此操作是为client注入token信息,这一步需要使用ak和sk信息换取临时凭证TokenInfo,其中包含了token随机串和过期时间两个参数。这一步需要去访问CTIAM的api接口,调用api接口,传入ak/sk/endpoint信息,返回token信息。
TokenInfo信息如下:
参数 | 类型 | 描述 |
---|---|---|
Token | string | token 随机串 |
ExpireTime | int64 | 过期时间,默认30分钟 |
获取TokenInfo这一步在Client 初始化时会自动调用akskToToken,用户默认可以不用进行这一步操作。
示例代码:使用ak,sk去获取临时凭证token
func (c *Client) SetToken() (err error) {
tokenInfo, err := akskToToken(c.Config.AccessKey, c.Config.SecretKey, c.Config.Endpoint)
if err != nil {
return err
}
c.SecurityToken = tokenInfo
return nil
}
4.3. 关于Log的操作
4.3.1. logItems.AddLogItem(logItem)
此操作用于生成待上传的日志,日志上传只能上传LogItem格式的日志,logItems是一个数组类型,里面包含若干条LogItem日志,格式如下:
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
logItems | []LogItem | LogItem格式的数组,将多份日志组合起来发送 | 是 |
其中LogItem类型需要的参数如下:
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
LogTimestamp | int | 时间戳,单位纳秒 | 是 |
OriginMsg | string | 原始日志内容 | 是 |
Contents | map[string]any | 日志内容,分词后的内容 | 否 |
Labels | map[string]any | 自定义标签 | 否 |
注意:其中Contents和Labels的key的长度不超过64字符,仅支持数字、字母、下划线、连字符(-)、点(.),且必须以字母开头。value类型最好使用字符串(string)和数字类型(int,double),其他类型建议先转为字符串类型,并且value值不能为空或空字符串。
示例代码:组装生成1条日志
logTimestamp := time.Now().UnixNano()
originMsg := "go sdk test oriMessage"
contents := make(map[string]any)
contents["contentInt"] = logTimestamp
contents["contentString"] = "contents test string"
contents["contentDouble"] = 3.1415926
contents["contentBool"] = "true"
labels := make(map[string]any)
labels["user_tag"] = "string"
logItem := lts.GenerateLogItem(logTimestamp, originMsg, contents, labels)
logItems := lts.NewLogItems()
logItems.AddLogItem(logItem)
4.4. 关于日志上传的操作
4.4.1. PutLogs()
此操作用于日志上传服务,需要传入的参数有三个,分别是logProject(日志项目编码),logUnit(日志单元编码),logItems(要上传的日志)。
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
logProject | string | 日志项目编码 | 是 |
logUnit | string | 日志单元编码 | 是 |
logItems | []LogItem | 日志信息 | 是 |
示例代码:上传日志,默认使用lz4压缩
response, err := client.PutLogs(logProject, logUnit, logItems)
if err != nil {
fmt.Println("Put log failed: ", err.Error())
} else {
fmt.Println("message: ", response.Message)
}
response 是Response格式的返回响应体,如下表格式:
参数 | 类型 | 描述 | 示例 |
---|---|---|---|
statusCode | int64 | 返回码,取值范围:0:-正常、-1:严重错误,其他自定义错误码 | |
message | string | 状态描述 | SUCCESS |
error | string | 参考错误编码列表 |
日志服务相关错误编码(部分):
statusCode | error | message |
---|---|---|
-1 | LTS_8000 | 请求失败,请稍候重试,或提交工单反馈 |
-1 | LTS_8001 | 内容不合法,无法解析 |
-1 | LTS_8004 | 日志内容包含的日志必须小于[x] MB和[y]条 |
-1 | LTS_8006 | 日志内容解压失败 |
-1 | LTS_8007 | Token失效,请重新获取 |
-1 | LTS_8009 | 无云日志服务产品实例,请先开通云日志服务 |
-1 | LTS_8010 | 日志项目不存在 |
-1 | LTS_8011 | 日志单元不存在 |
-1 | LTS_8013 | 在1个日志项目下,写入流量最大限制:200MB/s |
-1 | LTS_8014 | 在1个日志项目下,写入次数最大限制:1000次/s |
-1 | LTS_8015 | 在1个日志单元下,写入流量最大限制:100MB/s |
-1 | LTS_8016 | 在1个日志单元下,写入次数最大限制:500次/s |
-1 | LTS_18000 | 调用ITIAM的接口失败 |
5. 服务代码-异步批量上传
异步上传是为了解决同步上传无法高频异步发送等问题所增加的模块。原理是会开启多个协程,当调用日志发送接口后,会立刻返回,而内部的协程会将日志数据缓存合并,最后进行批量发送。异步上传特点如下:安全设计、高效异步传输、智能重试机制、详尽的行为跟踪 、优雅关闭流程等
性能卓越,在面临海量数据和高资源压力的场景下,producer 凭借多协程、智能缓存和批量发送等高级功能,帮助用户轻松达到目标吞吐量,同时简化了程序设计和开发流程。
异步处理优势,在内存资源充足的情况下,producer 的异步发送机制使得用户调用 sendLogs 方法时无需等待,实现了计算与 I/O 逻辑的有效分离。用户可以通过 callback 了解日志发送状态。
5.1. 关于Producer操作
5.1.1. GetDefaultProducerConfig()
此操作是获取producer的默认的配置文件。producer可以看作是一个启动器,内部封装了异步协程的初始化、启动和关闭等功能,只需要对producer进行操作,即可安全便捷地控制这些异步的协程。使用这份producerConfig配置去初始化一个producer。
producerConfig := lts.GetDefaultProducerConfig()
producerConfig.AllowLogLevel = "warn"
producerConfig.LingerMs = 2000
...
producer := lts.InitProducer(producerConfig)
producerConfig内的属性是异步操作中的线程所需要的参数,如果不设置参数,则初始化的时候会使用默认的参数,默认参数如下所示:
参数 | 类型 | 描述 |
---|---|---|
TotalSizeLnBytes | Int64 | 单个 producer 实例能缓存的日志大小上限,默认为 100MB。 |
MaxIoWorkerCount | Int64 | 单个producer能并发的最多groutine的数量,默认为50,该参数用户可以根据自己实际服务器的性能去配置。 |
MaxBlockSec | Int | 如果 producer 可用空间不足, send 方法的最大阻塞时间,默认为60秒。如果超过这个时间后空间仍无法得到满足,send 方法会抛出TimeoutException。如果将s值设为0,当所需空间无法得到满足时,send 方法会立即抛出 TimeoutException。如果希望 send 方法一直阻塞直到所需空间得到满足,可设为负数。 |
MaxBatchSize | Int64 | 当一个 ProducerBatch 中缓存的日志大于等于batchSizeThresholdInBytes 时,该 batch 将被发送,默认为 512 KB,最大5MB。 |
MaxBatchCount | Int | 当一个 ProducerBatch 中缓存的日志条数大于等于 batchCountThreshold 时,该 batch 将被发送,默认为 4096,最大40960。 |
LingerMs | Int64 | 一个 ProducerBatch 从创建到可发送的逗留时间,默认为 2 秒,最小可设置成 100 ms。 |
Retries | Int | 如果某个 ProducerBatch 首次发送失败,能够对其重试的次数,默认为 10 次。 如果 retries 小于等于 0,该 ProducerBatch 首次发送失败后将直接进入失败队列。 |
MaxReservedAttempts | Int | 每个 ProducerBatch 每次被尝试发送都对应着一个 Attemp,此参数用来控制返回给用户的 attempt 个数,默认只保留最近的 11 次 attempt 信息。该参数越大能让您追溯更多的信息,但同时也会消耗更多的内存。 |
BaseRetryBackoffMs | Int64 | 首次重试的退避时间,默认为 100 毫秒。 Producer 采样指数退避算法,第 N 次重试的计划等待时间为 baseRetryBackoffMs * 2^(N-1)。 |
MaxRetryBackoffMs | Int64 | 重试的最大退避时间,默认为 50 秒。 |
AllowLogLevel | String | 设置日志输出级别分别为debug,info,warn和error,默认值是warn, |
LogFileName | String | 日志文件输出路径,默认输出到stdout。 |
IsJsonType | Bool | 是否格式化文件输出格式,默认为false。 |
LogMaxSize | Int | 单个日志存储数量,默认为10M。 |
LogMaxBackups | Int | 日志轮转数量,默认为10。 |
NoRetryStatusCodeList | []int | 用户配置的不需要重试的错误码列表,当发送日志失败时返回的错误码在列表中,则不会重试。默认包含400,403,404。 |
5.1.2. BuildClient()
此操作是根据clientConfig配置初始化client,它会根据配置中的logProject属性去初始化一个client,不同的logProject会构建不同的client,使用不同的配置就可以构建多个client,每个client负责该project项目下的日志发送任务。
clientConfig := lts.GetDefaultClientConfig(endpoint, ak, sk, logProject)
clientConfig2 := lts.GetDefaultClientConfig(endpoint, ak2, sk2, logProject2)
err := producerInstance.BuildClient(clientConfig)
if err != nil {
fmt.Println("client initialization failed, err:", err)
return
}
err = producerInstance.BuildClient(clientConfig2)
if err != nil {
fmt.Println("client2 initialization failed, err:", err)
return
}
5.1.3. Close()
此操作是用于关闭producer。当不再需要发送数据或当前进程即将终止时,关闭producer是必要的步骤,以确保producer中缓存的所有数据都能得到妥善处理。当前,producer提供了两种关闭模式:安全关闭和有限关闭。
安全关闭模式确保在关闭producer之前,所有缓存的数据都已完成处理,所有相关协程都已关闭,并且所有注册的回调函数都已执行完毕。一旦producer被安全关闭,缓存的批次数据会立即得到处理,并且不会被重试。如果回调函数没有被阻塞,close方法通常能够迅速返回。
producer.SafeClose()
有限关闭模式适用于那些可能存在阻塞回调函数的场景,但您又希望close方法能在指定的时间内返回。有限关闭会接收用户传递的一个参数值,时间单位为秒,当开始关闭producer的时候开始计时,超过传递的设定值还未能完全关闭producer的话会强制退出producer,此时可能会有部分数据未被成功发送而丢失。
producer.close(60)
5.2. 关于异步发送操作
5.2.1. SendLogs()
此操作是将日志发送到后台的日志累加器队列中,然后立刻返回。累加器的状态达到可发送条件时(日志量达到阈值或者等待时间达到阈值),后台任务的线程将里面的日志进行打包批量发送。
producer.sendLogs(logProject, logUnit, logItem)
sendLogs()方法有很多同类方法,可以满足多种类型的发送,既可以发送单条日志,也可以发送多条日志,同时也可以根据需求是否需要结果返回值。类型如下:
sendLogs(logProject string, logUnit string, logItem *LogItem)
sendLogList(logProject string, logUnit string, logList []*LogItem)
sendLogsWithCallBack(logProject string, logUnit string, logItem *LogItem)
sendLogsListWithCallBack(logProject string, logUnit string, logList []*LogItem)
5.3. 关于获取发送结果的操作
5.3.1. Callback
在调用sendLogsWithCallBack方法时注册 callback 获取数据发送结果,代码片段如下。(可根据需求自行修改, io_worker.go)
func (callback *Callback) Success(result *Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Println("success ,statusCode:", attempt.StatusCode, ",message:", attempt.ErrorMessage, ",error:", attempt.ErrorCode)
}
}
func (callback *Callback) Fail(result *Result) {
attemptList := result.GetReservedAttempts()
for _, attempt := range attemptList {
fmt.Println("fail , statusCode:", attempt.StatusCode, ",message:", attempt.ErrorMessage, ",error:", attempt.ErrorCode)
}
}