1. 前言
安装使用Python SDK可以帮助开发者快速接入并使用天翼云的云日志服务相关功能。
2. 使用条件
2.1. 先决条件
用户需要具备以下条件才能够使用LTS SDK Python 版本:
1、购买并订阅了天翼云的云日志服务,并创建了日志项目和日志单元,获取到相应编码(logProject、logUnit)。
2、已获取AccessKey 和 SecretKey。
3、已安装Python3.6或以上版本。
2.2. 下载及安装
下载ctyun_lts_python_sdk.zip压缩包,放到相应位置后并解压。“ctyun_lts_python_sdk”目录中“example”为SDK的使用示例代码。
在“ctyun_lts_python_sdk”目录下执行标准 python 包安装命令:
# python setup.py install
安装命令会安装SDK所需要的依赖包,并将整个SDK作为一个python包安装到您的python环境中。其中会同步安装lz4、six、protobuf>=3.5.2、requests。
如果上述命令安装不成功,也可以手动逐步安装这四个python包。
pip install requests==2.27.1
pip install lz4==3.1.10
pip install six
pip install protobuf==3.5.2
之后可以通过一下命令查看是否安装成功,安装后的python包名为ctyun-lts-python-sdk。
pip list # 成功安装会出现 ctyun-lts-python-sdk
如果修改了SDK的源码,并希望重新安装使用,可以先卸载再安装。
pip uninstall ctyun-lts-python-sdk
python setup.py install
安装完python包之后,只需要引入lts的包就可以使用SDK的功能了。
from lts import LogItem, get_current_timestamp, LogClient, LogException
然后运行程序,以sample_putlogs举例:
python sample_putlogs.py
3. SDK使用设置
3.1. 基本设置
使用 SDK访问 LTS 的服务,需要设置正确的 AccessKey、SecretKey 和服务端 Endpoint,所有的服务可以使用同一 key 凭证来进行访问,但不同的服务需要使用不同的 endpoint 进行访问,详情参考天翼云官网-SDK接入概述。在调用前SDK,需要已知以下参数:
1、云日志服务访问地址。详情请查看访问地址(Endpoint)。
2、key凭证:accessKey和secretKey 。详情请查看如何获取访问密钥(AK/SK)。
3、日志项目编码:logProject,在使用SDK前,需要确保您有至少一个已经存在的日志项目,日志项目就是您要将日志上传到的地方。
4、日志单元编码:logUnit,在使用SDK前,需要确保日志项目中有至少一个已经存在的日志单元。
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | 域名 | 是 |
accessKey | string | AccessKey,简称ak | 是 |
secretKey | string | SecretKey ,简称sk | 是 |
logProject | string | 日志项目编码 | 是 |
logUnit | string | 日志单元编码 | 是 |
目前通过SDK将日志上传有两种上传形式:同步上传和异步批量上传。
1、同步上传:当调用日志上传接口时,sdk会立即进行网络请求调用,并返回发送结果。这种方式结构简单,可用于发送频率不高的场景。
2、异步批量上传:当调用日志上传接口时,后台线程会将日志进行累积,当达到发送条件时,会进行一次合并发送。对于需要频繁调用发送接口的场景,这种方式性能更卓越,更高效。
示例代码:同步上传
from lts import LogItem, get_current_timestamp, LogClient, LogException, ClientConfig
def main():
access_key = "your accessKey"
secret_key = "your secretKey"
log_project = "log project Code"
log_unit = "log unit Code"
endpoint = "endpoint"
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3) # 获取当前的时间戳,单位纳秒
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area", 3.1415926)
log_item.labels_push_back("user_tag", "string")
log_num = 10
for i in range(0, log_num):
log_items.append(log_item)
try:
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
log_client = LogClient(client_config)
for i in range(0, 100): # send 100 times
log_response = log_client.put_logs(log_project, log_unit, log_items)
log_response.log_print_body()
except LogException as ex:
print(ex)
if __name__ == "__main__":
main()
示例代码:异步批量上传
import time
from asynchronous.producer import Producer
from asynchronous.producer_config import ProducerConfig
from lts import LogItem, get_current_timestamp, LogException, ClientConfig
def main():
access_key = "your accessKey"
secret_key = "your secretKey"
log_project = "log project Code"
log_unit = "log unit Code"
endpoint = "endpoint"
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3)
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area", 3.1415926)
log_item.labels_push_back("user_tag", "string")
log_num = 10
for i in range(0, log_num):
log_items.append(log_item)
producer_config = ProducerConfig()
producer = Producer(producer_config)
try:
client_config = ClientConfig(endpoint, access_key, secret_key, log_project)
producer.build_clients(client_config)
producer.start()
for i in range(0, 100): # send 100 times
producer.send_logs_callback(log_project, log_unit, log_items)
time.sleep(5)
for i in range(0, 100): # send 100 times
producer.send_logs_callback(log_project, log_unit, log_items)
time.sleep(5)
# 关闭producer 发送
producer.safe_close()
except LogException as ex:
print(ex)
4. 服务代码示例-同步上传
同步上传每调用一次发送日志的方法,就会进行一次https请求发送日志,然后返回日志发送结果。同步上传对于需求简单,发送频率不高,发送量不大的场景比较适用。虽然是同步发送,但是可以将多条日志合在一起,一次性发送,减少API请求次数。
4.1. 关于Client的操作
4.1.1. ClientConfig()
此操作是初始化一个client的默认配置,里面包含如下参数,通过这份配置就可以去初始化一个client。
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | 域名 | 是 |
access_key | string | 用户信息凭证,简称ak | 是 |
secret_key | string | 用户信息凭证,简称sk | 是 |
log_project | string | 日志项目编码 | 是 |
request_timeout | int | http请求超时时间,默认30s | 否 |
compress_type | string | 日志压缩算法,默认“lz4” | 否 |
user_agent | string | lts-sdk-python/{sdk版本信息} | 否 |
retries | int | 日志上传失败的重试次数 | 否 |
no_retry_status_code | list | 不进行重试的响应状态码 | 否 |
示例代码:初始化ClientConfig
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
# 可对默认配置进行变更
client_config.request_timeout = 30
Client_config.retries = 3
...
4.1.2. LogClient()
此操作是初始化Client。用户使用client_config配置就可以去初始化一个Client。Client类似一个日志服务的处理器,它对API进行了有效的封装处理,只需要调用特定接口就可以使用对应功能。初始化Client之后,其包含的配置信息如下:
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | client_config 内的参数 | 是 |
... | String | client_config 内的其他参数,见上表 | 是 |
security_token | tuple | ak/sk生成的用户临时凭证信息{token字符串,过期时间} | 否 |
user_agent | string | lts-sdk-python/{sdk版本信息} | 否 |
session | Session | requests.Session(),用于http请求连接的复用 | 否 |
lock | Lock | threading.Lock() | 否 |
示例代码:初始化Client
client_config = ClientConfig(endpoint, access_key, secret_key,log_project)
log_client = LogClient(client_config)
4.2. 关于临时凭证Token的操作
4.2.1. init_security_token()
此操作是为client注入security_token信息,这一步需要使用ak和sk信息换取临时凭证security_token,其中包含了token随机串和过期时间两个参数。这一步需要去访问CTIAM的api接口,调用api接口,传入ak/sk/endpoint信息,返回token信息。
Security_token信息如下:
参数 | 类型 | 描述 |
---|---|---|
token | string | token 随机串 |
expire_time | int | 过期时间,默认30分钟 |
获取security_token这一步在Client 初始化时会自动调用。用户默认可以不用进行这一步操作。
示例代码:为client注入Token信息
def _init_security_token(self):
token,expire_time=
self._ak_sk_to_token(self.get_access_key(),self.get_secret_key())
token_tuple = (token, expire_time)
4.3. 关于Log的操作
4.3.1. log_items.append(log_item)
此操作用于生成待上传的日志,日志上传只能上传LogItem格式的日志,log_items是一个数组类型,里面包含若干条LogItem日志,格式如下:
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
log_items | []LogItem | LogItem格式的数组,将多份日志组合起来发送 | 是 |
配置一个或多个log_item,然后放入log_items下的数组,作为参数传递,这就是要传入的日志信息,LogItem类型需要的参数如下。
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
log_timestamp | int | 时间戳,单位纳秒 | 是 |
origin_msg | string | 原始日志内容 | 是 |
contents | dict | 日志内容,分词后的内容 | 否 |
labels | dict | 自定义标签 | 否 |
注意:其中contents和labels的key的长度不超过64字符,仅支持数字、字母、下划线、连字符(-)、点(.),且必须以字母开头。value类型最好使用字符串(string)和数字类型(int,double),其他类型建议先转为字符串类型,并且value值不能为空或空字符串。
示例代码:组装生成10条日志
log_items = []
log_item = LogItem()
curr_time = get_current_timestamp(3)
log_item.set_log_timestamp(curr_time)
log_item.set_origin_msg("Python sdk test oriMessage")
log_item.contents_push_back("contentInt", curr_time)
log_item.contents_push_back("level", "info")
log_item.contents_push_back("area:", 3.1415926)
log_item.contents_push_back("unit_id", 123145)
log_item.labels_push_back("user_tag", "string")
for i in range(1, 10):
log_items.append(log_item)
当然,对于dict 类型的contents 和labels 类型,也可以用set方法进行赋值,减少push_back()操作。
contents = {"contentInt": 123456,"contentString": "string content"}
log_item.set_contents(contents)
4.4. 关于日志上传的操作
4.4.1. put_logs()
此操作用于日志上传服务,需要传入的参数有三个,分别是log_project(日志项目编码),unit_code(日志单元编码),log_items(要上传的日志)。
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
log_project | string | 日志项目编码 | 是 |
unit_code | string | 日志单元编码 | 是 |
log_items | []LogItem | 日志信息 | 是 |
示例代码:上传日志
try:
log_client = LogClient(endpoint=endpoint, access_key=ak, secret_key=sk)
log_response = log_client.put_logs(log_project, unit_code, log_items)
log_response.log_print_body()
except LogException as ex:
print(ex)
resp_body有如下种类:
参数 | 类型 | 描述 | 示例 |
---|---|---|---|
statusCode | int | 返回码取值范围:0-正常、-1:严重错误其他自定义 | |
message | string | 状态描述 | SUCCESS |
error | string | 参考错误编码列表 |
日志服务相关错误编码(部分):
statusCode | error | message |
---|---|---|
-1 | LTS_8000 | 请求失败,请稍候重试,或提交工单反馈 |
-1 | LTS_8001 | 内容不合法,无法解析 |
-1 | LTS_8004 | 日志内容包含的日志必须小于[x] MB和[y]条 |
-1 | LTS_8006 | 日志内容解压失败 |
-1 | LTS_8007 | Token失效,请重新获取 |
-1 | LTS_8009 | 无云日志服务产品实例,请先开通云日志服务 |
-1 | LTS_8010 | 日志项目不存在 |
-1 | LTS_8011 | 日志单元不存在 |
-1 | LTS_8013 | 在1个日志项目下,写入流量最大限制:200MB/s |
-1 | LTS_8014 | 在1个日志项目下,写入次数最大限制:1000次/s |
-1 | LTS_8015 | 在1个日志单元下,写入流量最大限制:100MB/s |
-1 | LTS_8016 | 在1个日志单元下,写入次数最大限制:500次/s |
-1 | LTS_18000 | 调用ITIAM的接口失败 |
5. 服务代码-异步上传
异步上传是为了解决同步上传无法高频异步发送等问题所增加的模块。原理是会开启多个线程,当调用日志发送接口后,会立刻返回,而内部的线程会将日志数据缓存合并,最后进行批量发送。
5.1. 关于Producer的操作
5.1.1. ProducerConfig()
此操作是初始化一个producer的默认配置,里面包含如下参数,通过这份配置就可以去初始化一个producer。
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
default_batch_size | int | 每批次发送的日志的容量,默认512KB,最大5MB | 是 |
default_batch_count | int | 每批次发送的日志的条数,默认4096条,最大40960条 | 是 |
default_linger_ms | int | 批量发送时日志留存时间,默认2000ms,最小100ms | 是 |
default_worker_count | int | 日志发送的最大线程数,默认8个线程 | 是 |
示例代码:初始化ProducerConfig()配置
producer_config = ProducerConfig()
# 可对默认配置进行变更
producer.max_batch_size = 512*1024
producer.max_max_batch_count = 4096
...
5.1.2. Producer()
此操作是初始化producer,producer是一个异步任务的启动器,里面封装了异步任务的逻辑操作。使用producer_config 就可对其进行初始化。
示例代码:初始化Producer
producer = Producer(producer_config)
5.1.3. build_client()
此操作是根据client_config配置初始化client,其中logProject属性是client的唯一标识,不同的logProject会构建不同的client,使用不同的配置就可以构建多个client,每个client负责该project项目下的日志发送任务。
示例代码:构建两个client
client_config = ClientConfig(endpoint, ak, sk, log_project)
client_config2 = ClientConfig(endpoint, ak, sk, log_project2)
producer.build_clients(client_config)
producer.build_clients(client_config2)
5.1.4. start()
此操作是启动producer,使用start()方法后,producer内部的一些异步线程就会启动,之后就可以进行日志上传等任务了。
producer.start()
5.1.5. safe_close()
此操作是用于关闭producer。当不再需要发送数据或当前进程即将终止时,关闭producer是必要的步骤,以确保producer中缓存的所有数据都能得到妥善处理。当前,producer提供了两种关闭模式:安全关闭和有限关闭。
安全关闭模式确保在关闭producer之前,所有缓存的数据都已完成处理,所有相关线程都已关闭,并且所有注册的回调函数都已执行完毕。一旦producer被安全关闭,缓存的批次数据会立即得到处理。如果回调函数没有被阻塞,close方法通常能够迅速返回。
producer.safe_close()
有限关闭模式适用于那些可能存在阻塞回调函数的场景,但您又希望close方法能在指定的时间内返回。为此,可以使用close(long timeoutMs)方法,并指定一个超时时间。如果超过了指定的timeoutMs时间后producer仍未完全关闭,该方法将抛出一个Exception异常,这意味着可能还有部分缓存的数据未及时处理就被丢弃,同时用户注册的回调函数也可能不会被执行。
producer.close(2000)
5.2 异步上传操作
5.2.1 send_logs()
此操作是将日志发送到后台的日志累加器队列中,然后立刻返回。累加器的状态达到可发送条件时(日志量达到阈值或者等待时间达到阈值),后台任务的线程将里面的日志进行打包批量发送。发送的日志可以是单条,也可以是多条,同种这种发送方式不会返回响应结果,所以可能失败也可能成功。
示例代码:日志上传发送,无回调结果
# 发送单条日志
producer.send_logs(log_project, log_unit, log_item)
# 发送多条日志,log_items 是 log_item 的队列
producer.send_logs(log_project, log_unit, log_items)
5.2.2 send_logs_callback()
此操作是将日志异步批量发送,但会返回发送的响应结果,有利于检测当前的发送状况。
producer.send_logs_callback(log_project, log_unit, log_item)
producer.send_logs_callback(log_project, log_unit, log_items)
关于回调函数,可以通过修改io_work.py 中的callback()方法来进行自定义。回调结果是异步非阻塞的。
def callback(self, future):
result = future.result()
print(f"response: {result[0]}, send count : {result[1]}")
self.logger.info(f"response: {result[0]}, send count : {result[1]}")
# 回调结果:
response: {"statusCode":0,"message":"SUCCESS","error":""}, send count : 3070