1. 前言
安装使用Go SDK可以帮助开发者快速接入并使用天翼云的日志服务相关功能,目前支持同步上传,异步批量上传等功能。
2. 使用条件
2.1. 先决条件
用户需要具备以下条件才能够使用LTS SDK Java版本:
1、购买并订阅了天翼云的云日志服务,并创建了日志项目和日志单元,获取到相应编码(logProject、logUnit)。
2、已获取AccessKey 和 SecretKey。
3、已安装JDK1.8及以上环境。
2.2. 下载及安装
下载ctyun_lts_java_sdk.zip压缩包,放到相应位置后并解压,把包放在本地目录:<base_path>。如果您想直接使用SDK,可以不做修改,直接使用SDK源码,示例代码为example/SamplePutlogs.java。。
- 把SDK源码构建成jar包,可通过构建工具构建或者直接使用已有jar包:”ctyun-lts-java-sdk-1.6.0.jar”。
- 把生成的jar包引入本地maven仓库。
mvn install:install-file -Dfile=ctyun-lts-java-sdk-1.6.0.jar -DgroupId=cn.ctyun.lts -DartifactId=ctyun-lts-java-sdk -Dversion=1.6.0 -Dpackaging=jar
- 在您的maven工程的pom.xml文件中增加配置
<dependency>
<groupId>cn.ctyun.lts</groupId>
<artifactId>ctyun-lts-java-sdk</artifactId>
<version>1.6.0</version>
</dependency>
- 引入第三方依赖包
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83_noneautotype</version>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
<artifactId>lz4</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>io.opentelemetry.proto</groupId>
<artifactId>opentelemetry-proto</artifactId>
<version>1.1.0-alpha</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpmime</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.23.4</version>
</dependency>
3. SDK基本使用
3.1. 基本使用
使用 SDK访问 LTS 的服务,需要设置正确的 AccessKey、SecretKey 和服务端 Endpoint,所有的服务可以使用同一 key 凭证来进行访问,但不同的服务需要使用不同的 endpoint 进行访问,详情参考天翼云官网-SDK接入概述。在调用前SDK,需要已知以下参数:
1、云日志服务访问地址。详情请查看访问地址(Endpoint)。
2、key凭证:accessKey和secretKey 。详情请查看如何获取访问密钥(AK/SK)。
3、日志项目编码:logProject,在使用SDK前,需要确保您有至少一个已经存在的日志项目,日志项目就是您要将日志上传到的地方。
4、日志单元编码:logUnit,在使用SDK前,需要确保日志项目中有至少一个已经存在的日志单元。
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | 域名 | 是 |
accessKey | string | AccessKey,简称ak | 是 |
secretKey | string | SecretKey ,简称sk | 是 |
logProject | string | 日志项目编码 | 是 |
logUnit | string | 日志单元编码 | 是 |
目前通过SDK将日志上传到云日志服务有两种上传形式:同步上传和异步上传。
1、同步上传:当调用日志上传接口时,sdk会立即进行http请求调用,并返回发送结果。这种方式结构简单,可用于发送频率不高的场景。
2、异步上传:当调用日志上传接口时,后台线程会将日志进行累积,当达到发送条件时,会进行一次合并发送。对于需要频繁调用发送接口的场景,这种方式性能更卓越,更高效。
示例代码:同步上传
public static void main(String args[]) throws LogException, InterruptedException {
String accessKey = "your accessKey";
String secretKey = "your secretKey";
String logProject = "log project Code";//日志项目ID
String logUnit = "log unit Code";//日志单元ID
String endpoint = "endpoint";
Credentials credentials = new SecurityTokenCredentials(accessKey, secretKey, endpoint);
CredentialsProvider provider = new StaticCredentialsProvider(credentials);
ClientBuilder builder = new ClientBuilder(endpoint, provider);
Client client = builder.build();
//构建日志
LogItem logItem = new LogItem(System.currentTimeMillis());
logItem.setOrigin_msg(" sync ,java, test message!");
logItem.PushBackContent("level", "info");
logItem.PushBackLabel("usage_tag", "string");
LogItems logItems = new LogItems();
logItems.add(logItem);
try {
for (int i = 0; i < 100; i++) { //发送100次
PutLogsResponse response = client.PutLogs(logProject, logUnit, logItems, "");
PutLogsResponseBody res = response.getBody();
System.out.println("response: statusCode:"+res.getStatusCode()+" ,message"+res.getMessage()+" ,error:"+res.getError());
}
} catch (LogException e) {
System.out.println("error :" + e.getErrorCode() + " , " + e.getMessage() + " , " + e.getHttpCode());
e.printStackTrace();
} catch (Exception ex) {
ex.printStackTrace();
}
}
示例代码:异步批量上传
public static void main(String[] args) throws Exception {
String accessKey = "your accessKey";
String secretKey = "your secretKey";
String logProject = "log project Code";//日志项目ID
String logUnit = "log unit Code";//日志单元ID
String endpoint = "endpoint";
// 初始化日志生产者
Producer producer = new LogProducer(new ProducerConfig());
//以日志项目编码为区分,根据日志项目编码,会创建一个client
ProjectConfig projectConfig = new ProjectConfig(logProject, endpoint, accessKey, secretKey);
producer.buildClient(projectConfig);
int task = 10; // 要发送的日志任务数
// 提交日志发送任务到线程池
for (int i = 0; i < task; ++i) {
EXECUTOR_SERVICE.submit(
new Runnable() {
@Override
public void run() {
LogItem logItem = Utils.generateLogItem(1);
try {
// 发送日志,使用自定义回调函数
producer.sendLogs(logProject, logUnit, logItem, customCallback());
} catch (InterruptedException e) {
LOGGER.warn("线程在发送日志时被中断。");
} catch (Exception e) {
LOGGER.error("发送日志失败,日志项: {}", logItem, e);
}
}
});
}
// 稍作等待后关闭生产者
Thread.sleep(5000);
try {
producer.close();
} catch (InterruptedException e) {
LOGGER.warn("关闭生产者时线程被中断。");
} catch (ProducerException e) {
LOGGER.info("关闭生产者失败: {}", e);
}
// 等待所有任务完成
EXECUTOR_SERVICE.shutdown();
}
//回调函数
private static Callback customCallback() {
Callback callback = new Callback() {
@Override
public void onCompletion(Result result) {
// 处理回调结果
if (result.isSuccessful()) {
LOGGER.info("response: statusCode:{} , message:{} , errorCode: {}",result.getStatusCode(), result.getErrorMessage(), result.getErrorCode());
} else {
LOGGER.info("response: statusCode:{} , message:{} , errorCode: {}",result.getStatusCode(), result.getErrorMessage(), result.getErrorCode());
}
}
};
return callback;
}
4. 服务代码示例-同步上传
4.1. 关于Client的操作
4.1.1. New Client()
此操作是初始化Client。用户需要配置至少3个关键的参数才够初始化Client。
参数 | 参数类型 | 描述 | 是否必须 |
---|---|---|---|
endpoint | string | 域名 | 是 |
accessKey | string | AccessKey,简称ak | 是 |
secretKey | string | SecretKey ,简称sk | 是 |
示例代码:初始化Client配置
Credentials credentials = new SecurityTokenCredentials(accessKey , secretKey );
CredentialsProvider provider = new StaticCredentialsProvider(credentials);
ClientBuilder builder = new ClientBuilder(endpoint, provider);
Client client = builder.build();
4.2. 关于Log的操作
4.2.1. logData.PushBackContent(logItem)
此操作用于生成待上传的日志,日志上传只能上传LogItem格式的日志,logData包含了一个ArrayList
LogItem类型的日志格式如下:
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
log_timestamp | long | 时间戳,单位纳秒 | 是 |
origin_msg | string | 原始日志内容 | 是 |
content | ArrayList |
分词后的日志内容,可用于索引 | 否 |
labels | ArrayList |
自定义标签 | 否 |
其中LogContent 和Labels是包含了两个成员变量的类,成员变量为key:string,value:T。
示例代码:组装生成1条日志
LogData logData = new LogData();
LogItem logItem = new LogItem(System.currentTimeMillis());
logItem.setOrigin_msg("java, test message!");
logItem.PushBackContent("level", "info");
logItem.PushBackContent("unit_id", "12345678");
logItem.PushBackContent("area", 1.3145);
logItem.PushBackLabel("usage_tag", "string");
logData.add(logItem);
注意:其中content和labels的key的长度不超过64字符,仅支持数字、字母、下划线、连字符(-)、点(.),且必须以字母开头。value类型最好使用字符串(String)和数字类型(int,double),其他类型建议先转为字符串类型,并且value值不能为空或空字符串。
4.3. 关于日志上传的操作
4.3.1. PutLogs()
此操作用于日志上传服务,需要传入的参数有四个,分别是logProject(日志项目编码),logUnit(日志单元编码),logData(要上传的日志),source(日志来源)。
参数 | 类型 | 描述 | 是否必须 |
---|---|---|---|
logProject | string | 日志项目编码 | 是 |
logUnit | string | 日志单元编码 | 是 |
logData | vector |
日志信息 | 是 |
source | string | 日志的来源 | 是 |
示例代码:上传日志
PutLogsResponse reps = client.PutLogs(logProject, logUnit, logData, "");
System.out.println("resp:" + JSON.toJSONString(reps.getBody()) );
PutLogsResponse 里面包含了请求的响应头header和响应体body,其中接收响应数据的响应体body格式如下:
参数 | 类型 | 描述 | 示例 |
---|---|---|---|
statusCode | int | 返回码,取值范围:0:-正常、-1:严重错误,其他自定义 | |
message | string | 状态描述 | SUCCESS |
error | string | 参考错误编码列表 |
日志服务相关错误编码(部分):
statusCode | error | message |
---|---|---|
-1 | LTS_8000 | 请求失败,请稍候重试,或提交工单反馈 |
-1 | LTS_8001 | 内容不合法,无法解析 |
-1 | LTS_8004 | 日志内容包含的日志必须小于[x] MB和[y]条 |
-1 | LTS_8006 | 日志内容解压失败 |
-1 | LTS_8007 | Token失效,请重新获取 |
-1 | LTS_8009 | 无云日志服务产品实例,请先开通云日志服务 |
-1 | LTS_8010 | 日志项目不存在 |
-1 | LTS_8011 | 日志单元不存在 |
-1 | LTS_8013 | 在1个日志项目下,写入流量最大限制:200MB/s |
-1 | LTS_8014 | 在1个日志项目下,写入次数最大限制:1000次/s |
-1 | LTS_8015 | 在1个日志单元下,写入流量最大限制:100MB/s |
-1 | LTS_8016 | 在1个日志单元下,写入次数最大限制:500次/s |
-1 | LTS_18000 | 调用ITIAM的接口失败 |
5. 服务代码-异步上传
异步上传是为了解决同步上传无法高频异步发送等问题所增加的模块。原理是会开启多个线程,当调用日志发送接口后,会立刻返回,而内部的线程会将日志数据缓存合并,最后进行批量发送。异步上传特点如下:
- 线程安全设计 - producer 接口的所有对外暴露方法均经过精心设计,确保在多线程环境下安全无虞。
- 高效异步传输 - 调用 producer 的发送接口,用户可以迅速获得响应,而数据的实际发送则会在后台异步进行,并通过缓存和合并机制优化传输效率。
- 智能重试机制 - producer 配备智能重试功能,对于可重试的异常,将按照用户预设的最大重试次数和退避策略自动重试,确保数据的稳定传输。
- 详尽的行为跟踪 - 用户可通过 callback 或 future 机制获取数据发送的实时状态,包括每次尝试发送的详细信息,为问题排查和决策制定提供有力支持。
- 上下文一致性 - producer 保证同一实例产生的日志在服务端保持上下文一致,便于用户查看和分析日志间的关联关系。
- 优雅关闭流程 - 当用户调用 close 方法时,producer 将确保缓存中的所有数据得到妥善处理,并为用户提供关闭完成的通知,确保资源得到正确释放。
性能卓越,在面临海量数据和高资源压力的场景下,producer 凭借多线程、智能缓存和批量发送等高级功能,帮助用户轻松达到目标吞吐量,同时简化了程序设计和开发流程。
异步处理优势,在内存资源充足的情况下,producer 的异步发送机制使得用户调用 send 方法时无需等待,实现了计算与 I/O 逻辑的有效分离。用户可以通过返回的 future 或 callback 随时了解数据发送状态。
精细的资源管理,用户可以通过灵活配置参数,精确控制 producer 使用的内存大小和发送任务的线程数。这不仅避免了资源的无限制消耗,还能根据实际应用场景平衡资源消耗和写入性能。
5.1.关于Producer操作
5.1.1. LogProducer()
此操作是初始化producer的,producer可以看作是一个启动器,内部封装了异步线程的初始化、启动和关闭等功能,只需要对producer进行操作,即可安全便捷地控制这些异步的线程。使用这份producerConfig配置去初始化一个producer。
ProducerConfig producerConfig = new ProducerConfig();
producerConfig.setLingerMs(2000);
producerConfig.setBatchCountThreshold(4096);
....
Producer producer = new LogProducer(producerConfig);
producerConfig内的属性是异步操作中的线程所需要的参数,如果不设置参数,则初始化的时候会使用默认的参数,默认参数如下所示:
// 默认总大小(字节)。默认情况下,总大小限制为100MB。
int DEFAULT_TOTAL_SIZE_IN_BYTES = 100 * 1024 * 1024;
// 默认最大块处理时间。默认情况下,单个块的最大处理时间为1分钟。
long DEFAULT_MAX_BLOCK_MS = 60 * 1000L;
// 默认IO线程数。默认值为可用处理器数量的最大值和1之间的较大值。
int DEFAULT_IO_THREAD_COUNT = Math.max(Runtime.getRuntime().availableProcessors(), 1);
// 默认批次大小阈值。当数据量达到512KB时,将触发一批次的处理。
int DEFAULT_BATCH_SIZE_THRESHOLD_IN_BYTES = 512 *1024;
// 最大批次大小(字节)。默认情况下,单个批次的最大大小限制为5MB。
int MAX_BATCH_SIZE_IN_BYTES = 5* 1024 * 1024;
// 默认批次计数阈值。当积累达到4096个条目时,将触发一批次的处理。
int DEFAULT_BATCH_COUNT_THRESHOLD = 4096;
// 最大批次计数。默认情况下,单个批次的最大条目数限制为40960。
int MAX_BATCH_COUNT = 40960;
// 默认linger时间(毫秒)。 linger等待的时间默认为2秒。
int DEFAULT_LINGER_MS = 2000;
// Linger时间的下限。linger时间的最小值为100毫秒。
int LINGER_MS_LOWER_LIMIT = 100;
// 默认重试次数。默认情况下,操作失败后将重试10次。
int DEFAULT_RETRIES = 10;
//默认基础重试退避时间(毫秒)。重试之间的默认最小间隔为100毫秒。
long DEFAULT_BASE_RETRY_BACKOFF_MS = 100L;
// 默认最大重试退避时间(毫秒)。重试之间的最大间隔默认为50秒。
long DEFAULT_MAX_RETRY_BACKOFF_MS = 50 * 1000L;
5.1.2. ProjectConfig()
此操作是初始化project的配置,这份配置是初始化client所需要的参数,其中需要传入下面四个参数。
ProjectConfig projectConfig = new ProjectConfig(logProject,endpoint,accessKey, secretKey) ;
5.1.3. BuildClient()
此操作是根据projectConfig配置初始化client,它会根据配置中的logProject属性去初始化一个client,不同的logProject会构建不同的client,使用不同的配置就可以构建多个client,每个client负责该project项目下的日志发送任务。
//根据不同配置生成不同client,日志上传互不影响
producer.buildClient(projectConfig);
producer.buildClient(projectConfig2);
5.1.4. Close()
此操作是用于关闭producer。当不再需要发送数据或当前进程即将终止时,关闭producer是必要的步骤,以确保producer中缓存的所有数据都能得到妥善处理。当前,producer提供了两种关闭模式:安全关闭和有限关闭。
安全关闭模式确保在关闭producer之前,所有缓存的数据都已完成处理,所有相关线程都已关闭,并且所有注册的回调函数都已执行完毕。一旦producer被安全关闭,缓存的批次数据会立即得到处理,并且不会被重试。如果回调函数没有被阻塞,close方法通常能够迅速返回。
Thread.sleep(5000);
producer.close();
有限关闭模式适用于那些可能存在阻塞回调函数的场景,但您又希望close方法能在指定的时间内返回。为此,可以使用close(long timeoutMs)方法,并指定一个超时时间。如果超过了指定的timeoutMs时间后producer仍未完全关闭,该方法将抛出一个IllegalStateException异常,这意味着可能还有部分缓存的数据未及时处理就被丢弃,同时用户注册的回调函数也可能不会被执行。
producer.close(5000);
5.2.关于异步发送操作
5.2.1. SendLogs()
此操作是将日志发送到后台的日志累加器队列中,然后立刻返回。累加器的状态达到可发送条件时(日志量达到阈值或者等待时间达到阈值),后台任务的线程将里面的日志进行打包批量发送。
producer.sendLogs(logProject, logUnit, logItem, customerCallback());
//或者
producer.sendLogs(logProject, logUnit, logItems);
sendLogs()方法有很多重载方法,可以满足多种类型的发送,既可以发送单条日志,也可以发送多条日志,同时也可以根据需求是否需要结果返回值。类型如下:
sendLogs(String logProject, String logUnit, LogItem logItem)
sendLogs(String logProject, String logUnit, List<LogItem> logItems)
sendLogs(String logProject, String logUnit, LogItem logItem, Callback callback)
sendLogs(String logProject, String logUnit, List<LogItem> logItems, Callback callback)
...
5.3.关于获取发送结果的操作
由于 producer 提供的所有发送方法都是异步的,需要通过返回的 future 或者传入的 callback 获取发送结果。
5.3.1. Future
SendLogs 方法会返回一个 ListenableFuture,它除了可以像普通 future 那样通过调用 get 方法阻塞获得发送结果外,还允许注册回调方法(回调方法会在完成 future 设置后被调用)。
以下代码片段展示了 ListenableFuture 的使用方法,ExexutorFuture.java 对sendLogs()进行了回调结果的封装,现在只需要调用executorFuture.executeTask()就可自动返回回调结果。
int threadNum = 4;
ExecutorFuture executorFuture = new ExecutorFuture(producer,threadNum);
for (int i = 0; i < 1000; i++) {
executorFuture.executeTask(logProject, logUnit, Utils.generateLogItem(5));
}
Thread.sleep(5000); //关闭producer前,等待5s
producer.close();
executorFuture.shutdown();
用户可以自定义回调函数,为该 future 注册一个 FutureCallback 并将其投递到应用提供的线程池 executorService中执行,完整样例可参考 SamplePutLogsFuture.java、ExexutorFuture.java。
ListenableFuture<Result> future = producer.sendLogs(logProject, logUnit, logItem);
Futures.addCallback(future, new FutureCallback<Result>() {
@Override
public void onSuccess(Result result) {
System.out.println("response : " + result.getErrorMessage()+" , Code:"+result.getErrorCode());
}
@Override
public void onFailure(Throwable t) {
System.err.println("response : " + t);
}
}, executorService); // 在执行器服务的线程中执行回调
5.3.2. Callback
除了使用 future 外,您还可以通过在调用 send 方法时注册 callback 获取数据发送结果,代码片段如下。(完整样例可参考 SamplePutLogsCallback.java)
producer.sendLogs(logProject, logUnit, logItem, customCallback());
private static Callback customCallback() {
Callback callback = new Callback() {
@Override
public void onCompletion(Result result) {
// 处理回调结果
if (result.isSuccessful()) {
LOGGER.info("response: {} , {}", result.getErrorMessage(), result.getErrorCode());
} else {
LOGGER.info("response: {} , {}", result.getErrorMessage(), result.getErrorCode());
}
}
};
return callback;
}