阅读文本大概需要3分钟。
Elasticsearch除了可以文档Index操作外,也提供了一次可以操作多个文档Index的API,上一篇已经把单文档的说了,从今天起说一说多文档Index操作。多文档操作的API统称Multi-document APIs
1、 一次性获取多个Index
public static void main(String[] args) throws IOException {
//onstartup
TransportClientclient =getTransportClient();
//继续添加其他地址
MultiGetResponsemultiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1","2")
.get();
for(MultiGetItemResponseitemResponse : multiGetItemResponses) {
GetResponse response =itemResponse.getResponse();
if(response.isExists()){
String json =response.getSourceAsString();
System.out.println(json);
}
}
//onshutdown
client.close();
}
运行结果
{"user":"kimchy","postDate":"2013-01-30","message":"tryingout Elasticsearch"}
{"user":"kimchy","postDate":"2013-01-30","message":"tryingout Elasticsearch"}
2、 Bulk API,又称批量API允许在单个请求中索引和删除多个文档
public static void main(String[] args) throws IOException {
//on startup
TransportClient client = getTransportClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("twitter", "_doc", "3")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "zhangsan")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("twitter", "_doc", "4")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "lisi")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);
BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
}
//on shutdown
client.close();
}
通过以上代码就可以插入两条Index,可以通过查询Index的API查询到插入成功。
3、Using Bulk Processor,BulkProcessor提供一个基于请求数量和大小或者某个特定时间之后的自动刷新批处理操作接口
BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
publicvoid beforeBulk(long executionId,BulkRequest request) {
...
}
//调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions
@Override
publicvoid afterBulk(long executionId,BulkRequest request, BulkResponse response)
{
...
}
//调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败
@Override
publicvoid afterBulk(long executionId,BulkRequest request,Throwable failure) {
...
}// 调用失败抛 Throwable
}).setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5,ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();