searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Elasticsearch 简单介绍及操作

2023-08-30 06:20:47
10
0


ES(Elasticsearch)是一个开源的分布式搜索和分析引擎,基于Lucene库构建而成。它被设计用于处理大规模数据集,具有高性能、可扩展性和强大的全文搜索功能。

ES的基本原理和架构如下:

1. 分布式存储:ES将数据分布存储在多个节点上,每个节点可以容纳多个分片。每个分片都是一个独立的索引,包含一部分数据。这种分布式存储方式使得ES能够处理大规模数据集,并提供高可用性和容错性。

2. 倒排索引:ES使用倒排索引来加速搜索过程。倒排索引是一种将文档中的每个词映射到包含该词的文档的数据结构。通过倒排索引,ES可以快速定位包含关键词的文档,从而实现高效的全文搜索。

3. 分布式搜索:当用户执行搜索请求时,ES将搜索请求发送到集群中的所有节点,并将结果进行合并。每个节点都会搜索自己所负责的分片,并返回匹配的结果。最后,ES将所有节点返回的结果进行合并,并按照相关性进行排序,最终返回给用户。

4. 实时数据分析:ES支持实时数据分析和聚合功能。它可以对大规模数据集进行复杂的聚合操作,如计算平均值、求和、最大值、最小值等。ES使用分布式计算和缓存机制来提高聚合性能。

5. 高可用性和容错性:ES通过将每个分片进行复制和分布在不同的节点上来实现高可用性和容错性。当一个节点发生故障时,ES可以自动将分片从故障节点上复制到其他节点上,从而保证数据的可用性。

6. API和插件生态系统:ES提供了丰富的API和插件生态系统,使开发者可以方便地与其集成,并根据自己的需求进行定制和扩展。开发者可以使用RESTful API、Java API等与ES进行交互,并使用插件来扩展ES的功能。

总之,ES的基本原理和架构包括分布式存储、倒排索引、分布式搜索、实时数据分析、高可用性和容错性,以及API和插件生态系统。这些特性使得ES成为一个功能强大的分布式搜索和分析引擎。

 

除了原理的学习,基本操作的运用也是必不可少的。下面将提供一个demo,介绍一些基本操作样例,源数据可下载,代码可直接复制到本地执行。

 

ES指导文档和测试数据来源如下:

package com.example.demo.ES;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.lucene.index.Terms;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 *
 * 数据准备:
 * 手动在kibana dev_tools执行
 * 1、创建index,名称为为account
 * 2、创建mapping
 * 3、导入下载的测试数据
 *
 * POM:
 * <dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.5.3</version></dependency>
 * <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.4</version></dependency>
 * <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.3</version></dependency>
 * <dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency>
 */
public class ESSearchTest
{
    private static Logger logger = LoggerFactory.getLogger(ESSearchTest.class);

    static ElasticsearchClient client = null;
    static String host = "localhost";
    static int port = 9200;

    public static ElasticsearchClient getClient()
    {
        if (client == null)
        {
            return new ElasticsearchClient(
                    new RestClientTransport(
                            RestClient.builder(new HttpHost(host, port)).build(),
                            new JacksonJsonpMapper()));

        }
        return client;
    }

    String indexName = "account";

    @Test
    public void queryAll() throws IOException
    {
        ElasticsearchClient client = ESSearchTest.getClient();
        //1
        SearchResponse<Map> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder.index(indexName), Map.class);
        logger.info("time cost : {}", searchResponse.took());
        logger.info("size : {}", searchResponse.hits().total().value());

        //2
        SearchResponse<Map> searchResponse2 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(querBuilder -> querBuilder
                                .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
                , Map.class);
        logger.info("time cost : {}", searchResponse2.took());
        logger.info("size : {}", searchResponse2.hits().total().value());

        //3
        SearchResponse<Account> searchResponse3 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(querBuilder -> querBuilder
                                .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
                , Account.class);
        logger.info("time cost : {}", searchResponse3.took());
        logger.info("size : {}", searchResponse3.hits().total().value());
        List<Hit<Account>> hits = searchResponse3.hits().hits();
        List<Account> accounts = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            accounts.add(hit.source());
        }
        logger.info("query accounts size: {}", accounts.size());
        logger.info(accounts.toString());
    }


    /**
     * 分页
     * 排序
     * match查询
     *
     * @throws IOException
     */
    @Test
    public void queryAccount() throws IOException
    {
        //查询条件
        AccountSearchQuery accountSearchQuery = new AccountSearchQuery();
        accountSearchQuery.setPageNo(1);
        accountSearchQuery.setPageSize(10);
        accountSearchQuery.setState("ND");

        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        SearchResponse<Account> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder
                        //索引名
                        .index(indexName)
                        //match查询
                        .query(queryBuilder -> queryBuilder
                                .match(matchQueryBuilder -> matchQueryBuilder
                                        .field("state").query(accountSearchQuery.getState())))

                        //分页
                        .from(parsrePageFrom(accountSearchQuery.getPageNo(), accountSearchQuery.getPageSize()))
                        .size(accountSearchQuery.getPageSize())
                        //排序
                        .sort(sortOptionsBuilder -> sortOptionsBuilder
                                .field(fieldSortBuilder -> fieldSortBuilder
                                        .field("age").order(SortOrder.Asc))),
                Account.class);
        logger.info("time cost : {}", searchResponse.took());
        HitsMetadata<Account> hitsMetadata = searchResponse.hits();
        logger.info("size : {}", hitsMetadata.total().value());
        List<Hit<Account>> hits = hitsMetadata.hits();
        List<Account> results = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            results.add(hit.source());
        }
        logger.info("account size:{}", results.size());
        logger.info(results.toString());
    }

    @Test
    public void queryAccount2() throws IOException
    {
        String city = "Leming";
        String state = "ND";
        Integer pageNo = 1;
        Integer pageSize = 10;
        Integer[] ageRange = new Integer[]{22,35};


        //查询条件
        AccountSearchQuery accountSearchQuery = new AccountSearchQuery();
        accountSearchQuery.setPageNo(pageNo);
        accountSearchQuery.setPageSize(pageSize);
        accountSearchQuery.setState(state);
        accountSearchQuery.setCity(city);
        accountSearchQuery.setAgeRange(ageRange);

        List<Account> accounts = doQueryAccount(accountSearchQuery);
        logger.info(accounts.toString());

    }
    /**
     * bool组合查询
     * 过滤返回字段
     * 范围查找
     * @throws IOException
     */
    @Test
    public List<Account> doQueryAccount(AccountSearchQuery accountSearchQuery)
            throws IOException
    {
        ElasticsearchClient client = ESSearchTest.getClient();
        SearchResponse<Account> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder
                        //索引名
                        .index(indexName)
                        //match查询
                        //bool must两个条件必须都符合
                        .query(queryBuilder -> queryBuilder
                                .bool(boolQueryBuilder -> boolQueryBuilder.
                                        //must
                                        must(queryBuilder2 -> queryBuilder2
                                                //match
                                                .match(matchQueryBuilder -> matchQueryBuilder
                                                        .field("state").query(accountSearchQuery.getState())))
                                        //must
                                        .must(queryBuilder3 -> queryBuilder3
                                                //match
                                                .match(matchQueryBuilder2 -> matchQueryBuilder2
                                                        .field("city").query(accountSearchQuery.getCity())))
                                        //must
                                        .must(queryBuilder4 -> queryBuilder4
                                                //range
                                                .range(rangeQueryBuilder -> rangeQueryBuilder
                                                        .field("age")
                                                            .gt(JsonData.of(accountSearchQuery.getAgeRange()[0]))
                                                            .lt(JsonData.of(accountSearchQuery.getAgeRange()[1]))))))
                        //过滤字段
                        .source(sourceConfigBuilder -> sourceConfigBuilder
                                .filter(sourceFilterBuilder  -> sourceFilterBuilder
                                        //包含
                                        .includes("firstname","age", "state", "city")
                                        //不包含
                                        .excludes("email")))
                        //分页
                        .from(parsrePageFrom(accountSearchQuery.getPageNo(), accountSearchQuery.getPageSize()))
                        .size(accountSearchQuery.getPageSize())
                        //排序
                        .sort(sortOptionsBuilder -> sortOptionsBuilder
                                .field(fieldSortBuilder -> fieldSortBuilder
                                        .field("age").order(SortOrder.Asc))),
                Account.class);
        HitsMetadata<Account> hitsMetadata = searchResponse.hits();
        List<Hit<Account>> hits = hitsMetadata.hits();
        List<Account> results = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            results.add(hit.source());
        }
        return results;
    }

    /**
     * match_all 查询所有
     * match 全文搜索,对输入内容先分词再查询
     * multi_match 多字段匹配以恶搞字段
     * match_phrase 匹配整个查询字符串
     * term 精确匹配,不分词(不太适合text字段的匹配)
     * terms 单个字段多个词查询
     * bool 组合查询
     * fuzzy 模糊查询
     * query_string
     * simple_query_string
     *
     * @throws IOException
     */
    @Test
    public void queryAccount3() throws IOException
    {
        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        //match_all 查询所有
        SearchResponse<Account> s1 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .query(q -> q.matchAll(m -> m)), Account.class);
        logger.info("s1 size:{}, data:{}", s1.hits().total().value(), s1.toString());
        //match 全文搜索,对输入内容先分词再查询
        SearchResponse<Account> s2 = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(indexName)
                        .query(q -> q.match(m -> m.field("address").query("Fleet")))
                , Account.class);
        logger.info("s2 size:{}, data:{}", s2.hits().total().value(), s2.toString());
        //multi_match 多字段匹配
        SearchResponse<Account> s3 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.multiMatch(m -> m.fields("firstname", "lastname").query("Garrett")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s3.hits().total().value(), s3.toString());
        //match_phrase 匹配整个查询字符串,不分词
        SearchResponse<Account> s4 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.matchPhrase(m -> m.field("address").query("396 Grove Place")))
                , Account.class);
        logger.info("s4 size:{}, data:{}", s4.hits().total().value(), s4.toString());
        //term 精确匹配,不分词
        SearchResponse<Account> s5 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.term(t -> t.field("age").value(40)))
                , Account.class);
        logger.info("s5 size:{}, data:{}", s5.hits().total().value(), s5.toString());
        //terms 单个属性多个词查询
        List<FieldValue> values = new ArrayList<>();
        values.add(new FieldValue.Builder().anyValue(JsonData.of(40)).build());
        values.add(new FieldValue.Builder().anyValue(JsonData.of(39)).build());
        SearchResponse<Account> s6 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.terms(t -> t.field("age").terms(ts -> ts.value(values))))
                , Account.class);
        logger.info("s6 size:{}, data:{}", s6.hits().total().value(), s6.toString());
        //bool 组合查询
        SearchResponse<Account> s7 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q
                                .bool(m -> m
                                        .must(q2 -> q2
                                                .match(q3 -> q3.field("age").query(34)))
                                        .mustNot(q3 -> q3
                                                .term(t -> t.
                                                        field("state").value("DC")))
                                        .should(q4 -> q4.matchPhrase(m2 -> m2
                                                .field("address").query("975 Dakota Place")))))
                , Account.class);
        logger.info("s7 size:{}, data:{}", s7.hits().total().value(), s5.toString());
        //fuzzy 模糊查询
        //fuzziness 最大允许编辑距离、prefix_length、max_expansions的等参数请查询相关文档
        //先分词再模糊查询
        SearchResponse<Account> s8 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.match(m -> m.field("firstname").query("Bush").fuzziness("1")))
                , Account.class);
        logger.info("s8 size:{}, data:{}", s8.hits().total().value(), s8.toString());
        //不分词直接模糊查询
        SearchResponse<Account> s9 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.fuzzy(f -> f.field("firstname").fuzziness("1").value("Bush")))
                , Account.class);
        logger.info("s9 size:{}, data:{}", s9.hits().total().value(), s9.toString());
        //query_string
        //类似match
        SearchResponse<Account> s10 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.defaultField("address").query("Fleet")))
                , Account.class);
        logger.info("s10 size:{}, data:{}", s10.hits().total().value(), s10.toString());
        //类似mulit_match
        SearchResponse<Account> s11 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("firstname", "lastname").query("Garrett")))
                , Account.class);
        logger.info("s11 size:{}, data:{}", s11.hits().total().value(), s11.toString());
        //类似match_phrase
        SearchResponse<Account> s12 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.defaultField("address").query("\"396 Grove Place\"")))
                , Account.class);
        logger.info("s12 size:{}, data:{}", s12.hits().total().value(), s12.toString());
        //运算符
        //同时包含Grove和Place
        SearchResponse<Account> s13 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("address").query("Grove AND Place")))
                , Account.class);
        logger.info("s13 size:{}, data:{}", s13.hits().total().value(), s13.toString());
        //同时包含Grove和Place 或者 包含cobbhumphrey@apexia.com
        SearchResponse<Account> s14 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs
                                .fields("address", "email").query("(Grove AND Place) OR cobbhumphrey@apexia.com")))
                , Account.class);
        logger.info("s14 size:{}, data:{}", s14.hits().total().value(), s14.toString());
        //同时包含Grove和Place
        SearchResponse<Account> s15 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("address").query("Grove Place")))
                , Account.class);
        logger.info("s15 size:{}, data:{}", s15.hits().total().value(), s15.toString());
        //query_simple_string
        //和query_string相比,不支持AND OR,支持+(AND)、|(OR)、-(NOT)
        SearchResponse<Account> s16 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.simpleQueryString(sqs -> sqs.fields("address").query("Grove + Place")))
                , Account.class);
        logger.info("s16 size:{}, data:{}", s16.hits().total().value(), s16.toString());
    }

    /**
     * 聚合查询
     * 计数
     * 求和
     * 最大
     * 最小
     * 平均
     * 分组
     * 去重
     * @throws IOException
     */
    @Test
    public void queryAccount4() throws IOException
    {
        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        //最大、最小、平均、求和
        SearchResponse<Account> s1 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        //最大
                        .aggregations("maxAge", aggregationBuilder -> aggregationBuilder
                                .max(maxAggregationBuilder -> maxAggregationBuilder.field("age")))
                        //最小
                        .aggregations("minAge", aggregationBuilder -> aggregationBuilder
                                .min(minAggregationBuilder -> minAggregationBuilder.field("age")))
                        //平均
                        .aggregations("avgAge", aggregationBuilder -> aggregationBuilder
                                .avg(avgAggregationBuilder -> avgAggregationBuilder.field("age")))
                        //求和
                        .aggregations("sumAge", aggregationBuilder -> aggregationBuilder
                                .sum(sumAggregationBuilder -> sumAggregationBuilder.field("age")))
                , Account.class);
        logger.info("s1 size:{}, data:{}", s1.hits().total().value(), s1.toString());

        Map<String, Aggregate> aggregations = s1.aggregations();
        Aggregate maxAge = aggregations.get("maxAge");
        Aggregate minAge = aggregations.get("minAge");
        Aggregate avgAge = aggregations.get("avgAge");
        Aggregate sumAge = aggregations.get("sumAge");

        logger.info("max age : {}", maxAge.max().value());
        logger.info("min age : {}", minAge.min().value());
        logger.info("avg age : {}", avgAge.avg().value());
        logger.info("sum age : {}", sumAge.sum().value());

        //计数、统计、最大、最小、平均、求和
        SearchResponse<Account> s2 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("accountNumberStat", aggregationBuilder -> aggregationBuilder
                                //统计
                                .stats(statsAggregationBuilder -> statsAggregationBuilder.field("account_number")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s2.hits().total().value(), s2.toString());
        Aggregate ans = s2.aggregations().get("accountNumberStat");
        logger.info("account number counts:{}", ans.stats().count());
        logger.info("account number sum:{}", ans.stats().sum());
        logger.info("account number max:{}", ans.stats().max());
        logger.info("account number min:{}", ans.stats().min());
        logger.info("account number avg:{}", ans.stats().avg());
        //去重
        SearchResponse<Account> s3 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("cityDistinct", aggregationBuilder -> aggregationBuilder
                                //去重
                                .cardinality(cardinalityAggregationBuilder  -> cardinalityAggregationBuilder
                                        .field("city")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s3.hits().total().value(), s3.toString());
        Aggregate aggregation = s3.aggregations().get("cityDistinct");
        CardinalityAggregate cityDistinct = aggregation.cardinality();
        logger.info("distinct city number:{}", cityDistinct.value());
        //分组
        SearchResponse<Map> s4 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("stateAgg", aggregationBuilder -> aggregationBuilder
                                .terms(termsAggregation -> termsAggregation
                                        .field("state")))
                , Map.class);
        logger.info("s4 size:{}, data:{}", s4.hits().total().value(), s4.toString());
        List<StringTermsBucket> bs = s4.aggregations().get("stateAgg").sterms().buckets().array();
        for (StringTermsBucket b : bs)
        {
            logger.info("key:{}, value:{}", b.key().stringValue(), b.docCount());
        }
        /**
         * 做一个复杂的统计
         * 功能:
         * 查询年龄在20-30岁之间、地址包含”Street“的数据
         * 聚合查询数据
         * 根据state(假设这个字段代表地域,州名称)分组
         * 统计每个州的最高、最低、平均工资和总人数(一条数据假设为一个人)
         * 统计每个州以男女为区分,每种性别的最高、最低、平均工资和总人数(一条数据假设为一个人)
         */
        SearchResponse<Map> s5 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .query(queryBuilder -> queryBuilder
                                .bool(boolQueryBuilder -> boolQueryBuilder
                                        .must(queryBuilder2 -> queryBuilder2
                                                //地址包含”Street“的数据
                                                .match(matchQueryBuilder -> matchQueryBuilder
                                                        .field("address").query("Street")))
                                        .must(queryBuilder3 -> queryBuilder3
                                                //年龄在20-30岁之间
                                                .range(rangeBuilder -> rangeBuilder
                                                        .field("age").lt(JsonData.of(30)).gt(JsonData.of(20))))))
                        //聚合
                        .aggregations("stateAgg", aggregationBuilder -> aggregationBuilder
                                //根据state(州)分组
                                .terms(termsAggregation -> termsAggregation.field("state"))
                                //州最高工资
                                .aggregations("stateAccountMaxAgg", aggregationBuilder2 -> aggregationBuilder2.max(queryBuilder4 -> queryBuilder4.field("account_number")))
                                //州最低工资
                                .aggregations("stateAccountMinAgg", aggregationBuilder3 -> aggregationBuilder3.min(queryBuilder5 -> queryBuilder5.field("account_number")))
                                //州平均工资
                                .aggregations("stateAccountAvgAgg", aggregationBuilder4 -> aggregationBuilder4.avg(queryBuilder6 -> queryBuilder6.field("account_number")))
                                //再次聚合
                                .aggregations("genderAgg", aggregationBuilder5 -> aggregationBuilder5
                                        //根据性别分组
                                        .terms(termsAggregation2 -> termsAggregation2.field("gender"))
                                        //当前性别最高薪资
                                        .aggregations("GenderAccountMaxAgg", aggregationBuilder6 -> aggregationBuilder6
                                                .max(queryBuilder7 -> queryBuilder7.field("account_number")))
                                        .aggregations("GenderAccountMinAgg", aggregationBuilder7 -> aggregationBuilder7
                                                .min(queryBuilder8 -> queryBuilder8.field("account_number")))
                                        .aggregations("GenderAccountAvgAgg", aggregationBuilder8 -> aggregationBuilder8
                                                .avg(queryBuilder9 -> queryBuilder9.field("account_number")))))
                , Map.class);
        logger.info("s4 size:{}, data:{}", s5.hits().total().value(), s5.toString());
        List<StringTermsBucket> buckets = s5.aggregations().get("stateAgg").sterms().buckets().array();
        for (StringTermsBucket b : buckets)
        {
            StringBuilder s = new StringBuilder();
            s.append("State ").append(b.key().stringValue())
                    .append(" has ").append(b.docCount()).append(" people. ")
                    .append("Account max:").append(b.aggregations().get("stateAccountMaxAgg").max().value())
                    .append(", min:").append(b.aggregations().get("stateAccountMinAgg").min().value())
                    .append(", avg:").append(b.aggregations().get("stateAccountAvgAgg").avg().value())
                    .append(". ");
            List<StringTermsBucket> list = b.aggregations().get("genderAgg").sterms().buckets().array();
            for (StringTermsBucket l : list)
            {
                s.append("Gender: ").append(l.key().stringValue())
                        .append(" , account max:").append(l.aggregations().get("GenderAccountMaxAgg").max().value())
                        .append(",min:").append(l.aggregations().get("GenderAccountMinAgg").min().value())
                        .append(",avg:").append(l.aggregations().get("GenderAccountAvgAgg").avg().value())
                        .append(".    ");
            }
            logger.info(s.toString());

        }
    }

    private Integer parsrePageFrom(Integer pageNo, Integer pageSize)
    {
        if (pageNo != null && pageSize != null && pageSize.intValue() > 0)
        {
            return (pageNo.intValue() > 0 ? pageNo.intValue() - 1 : 0)
                    * pageSize.intValue();

        }
        return 0;
    }

    public class AccountSearchQuery
    {
        public Integer pageNo;
        public Integer pageSize;

        public String city;

        public String state;

        public Integer[] ageRange;

        public Integer getPageNo()
        {
            return pageNo;
        }

        public void setPageNo(Integer pageNo)
        {
            this.pageNo = pageNo;
        }

        public Integer getPageSize()
        {
            return pageSize;
        }

        public void setPageSize(Integer pageSize)
        {
            this.pageSize = pageSize;
        }

        public String getCity()
        {
            return city;
        }

        public void setCity(String city)
        {
            this.city = city;
        }

        public String getState()
        {
            return state;
        }

        public void setState(String state)
        {
            this.state = state;
        }

        public Integer[] getAgeRange()
        {
            return ageRange;
        }

        public void setAgeRange(Integer[] ageRange)
        {
            this.ageRange = ageRange;
        }
    }

    public static class Account
    {
        private Integer account_number;

        private Integer balance;

        private String firstname;

        private String lastname;

        private Integer age;

        private String gender;

        private String address;

        private String employer;

        private String email;

        private String city;

        private String state;

        public Account()
        {
        }

        public Account(Integer account_number, Integer balance,
                String firstname, String lastname, Integer age, String gender,
                String address, String employer, String email, String city,
                String state)
        {
            this.account_number = account_number;
            this.balance = balance;
            this.firstname = firstname;
            this.lastname = lastname;
            this.age = age;
            this.gender = gender;
            this.address = address;
            this.employer = employer;
            this.email = email;
            this.city = city;
            this.state = state;
        }

        public void setAccount_number(Integer account_number){
            this.account_number = account_number;
        }
        public int getAccount_number(){
            return this.account_number;
        }
        public void setBalance(Integer balance){
            this.balance = balance;
        }
        public int getBalance(){
            return this.balance;
        }
        public void setFirstname(String firstname){
            this.firstname = firstname;
        }
        public String getFirstname(){
            return this.firstname;
        }
        public void setLastname(String lastname){
            this.lastname = lastname;
        }
        public String getLastname(){
            return this.lastname;
        }
        public void setAge(Integer age){
            this.age = age;
        }
        public int getAge(){
            return this.age;
        }
        public void setGender(String gender){
            this.gender = gender;
        }
        public String getGender(){
            return this.gender;
        }
        public void setAddress(String address){
            this.address = address;
        }
        public String getAddress(){
            return this.address;
        }
        public void setEmployer(String employer){
            this.employer = employer;
        }
        public String getEmployer(){
            return this.employer;
        }
        public void setEmail(String email){
            this.email = email;
        }
        public String getEmail(){
            return this.email;
        }
        public void setCity(String city){
            this.city = city;
        }
        public String getCity(){
            return this.city;
        }
        public void setState(String state){
            this.state = state;
        }
        public String getState(){
            return this.state;
        }

        @Override public String toString()
        {
            return "Account{" + "account_number=" + account_number
                    + ", balance=" + balance + ", firstname='" + firstname
                    + '\'' + ", lastname='" + lastname + '\'' + ", age=" + age
                    + ", gender='" + gender + '\'' + ", address='" + address
                    + '\'' + ", employer='" + employer + '\'' + ", email='"
                    + email + '\'' + ", city='" + city + '\'' + ", state='"
                    + state + '\'' + '}';
        }
    }

}
0条评论
0 / 1000
m****n
1文章数
0粉丝数
m****n
1 文章 | 0 粉丝
m****n
1文章数
0粉丝数
m****n
1 文章 | 0 粉丝
原创

Elasticsearch 简单介绍及操作

2023-08-30 06:20:47
10
0


ES(Elasticsearch)是一个开源的分布式搜索和分析引擎,基于Lucene库构建而成。它被设计用于处理大规模数据集,具有高性能、可扩展性和强大的全文搜索功能。

ES的基本原理和架构如下:

1. 分布式存储:ES将数据分布存储在多个节点上,每个节点可以容纳多个分片。每个分片都是一个独立的索引,包含一部分数据。这种分布式存储方式使得ES能够处理大规模数据集,并提供高可用性和容错性。

2. 倒排索引:ES使用倒排索引来加速搜索过程。倒排索引是一种将文档中的每个词映射到包含该词的文档的数据结构。通过倒排索引,ES可以快速定位包含关键词的文档,从而实现高效的全文搜索。

3. 分布式搜索:当用户执行搜索请求时,ES将搜索请求发送到集群中的所有节点,并将结果进行合并。每个节点都会搜索自己所负责的分片,并返回匹配的结果。最后,ES将所有节点返回的结果进行合并,并按照相关性进行排序,最终返回给用户。

4. 实时数据分析:ES支持实时数据分析和聚合功能。它可以对大规模数据集进行复杂的聚合操作,如计算平均值、求和、最大值、最小值等。ES使用分布式计算和缓存机制来提高聚合性能。

5. 高可用性和容错性:ES通过将每个分片进行复制和分布在不同的节点上来实现高可用性和容错性。当一个节点发生故障时,ES可以自动将分片从故障节点上复制到其他节点上,从而保证数据的可用性。

6. API和插件生态系统:ES提供了丰富的API和插件生态系统,使开发者可以方便地与其集成,并根据自己的需求进行定制和扩展。开发者可以使用RESTful API、Java API等与ES进行交互,并使用插件来扩展ES的功能。

总之,ES的基本原理和架构包括分布式存储、倒排索引、分布式搜索、实时数据分析、高可用性和容错性,以及API和插件生态系统。这些特性使得ES成为一个功能强大的分布式搜索和分析引擎。

 

除了原理的学习,基本操作的运用也是必不可少的。下面将提供一个demo,介绍一些基本操作样例,源数据可下载,代码可直接复制到本地执行。

 

ES指导文档和测试数据来源如下:

package com.example.demo.ES;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.lucene.index.Terms;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 *
 * 数据准备:
 * 手动在kibana dev_tools执行
 * 1、创建index,名称为为account
 * 2、创建mapping
 * 3、导入下载的测试数据
 *
 * POM:
 * <dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.5.3</version></dependency>
 * <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.17.4</version></dependency>
 * <dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.12.3</version></dependency>
 * <dependency><groupId>jakarta.json</groupId><artifactId>jakarta.json-api</artifactId><version>2.0.1</version></dependency>
 */
public class ESSearchTest
{
    private static Logger logger = LoggerFactory.getLogger(ESSearchTest.class);

    static ElasticsearchClient client = null;
    static String host = "localhost";
    static int port = 9200;

    public static ElasticsearchClient getClient()
    {
        if (client == null)
        {
            return new ElasticsearchClient(
                    new RestClientTransport(
                            RestClient.builder(new HttpHost(host, port)).build(),
                            new JacksonJsonpMapper()));

        }
        return client;
    }

    String indexName = "account";

    @Test
    public void queryAll() throws IOException
    {
        ElasticsearchClient client = ESSearchTest.getClient();
        //1
        SearchResponse<Map> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder.index(indexName), Map.class);
        logger.info("time cost : {}", searchResponse.took());
        logger.info("size : {}", searchResponse.hits().total().value());

        //2
        SearchResponse<Map> searchResponse2 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(querBuilder -> querBuilder
                                .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
                , Map.class);
        logger.info("time cost : {}", searchResponse2.took());
        logger.info("size : {}", searchResponse2.hits().total().value());

        //3
        SearchResponse<Account> searchResponse3 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(querBuilder -> querBuilder
                                .matchAll(matchAllQueryBuilder -> matchAllQueryBuilder))
                , Account.class);
        logger.info("time cost : {}", searchResponse3.took());
        logger.info("size : {}", searchResponse3.hits().total().value());
        List<Hit<Account>> hits = searchResponse3.hits().hits();
        List<Account> accounts = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            accounts.add(hit.source());
        }
        logger.info("query accounts size: {}", accounts.size());
        logger.info(accounts.toString());
    }


    /**
     * 分页
     * 排序
     * match查询
     *
     * @throws IOException
     */
    @Test
    public void queryAccount() throws IOException
    {
        //查询条件
        AccountSearchQuery accountSearchQuery = new AccountSearchQuery();
        accountSearchQuery.setPageNo(1);
        accountSearchQuery.setPageSize(10);
        accountSearchQuery.setState("ND");

        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        SearchResponse<Account> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder
                        //索引名
                        .index(indexName)
                        //match查询
                        .query(queryBuilder -> queryBuilder
                                .match(matchQueryBuilder -> matchQueryBuilder
                                        .field("state").query(accountSearchQuery.getState())))

                        //分页
                        .from(parsrePageFrom(accountSearchQuery.getPageNo(), accountSearchQuery.getPageSize()))
                        .size(accountSearchQuery.getPageSize())
                        //排序
                        .sort(sortOptionsBuilder -> sortOptionsBuilder
                                .field(fieldSortBuilder -> fieldSortBuilder
                                        .field("age").order(SortOrder.Asc))),
                Account.class);
        logger.info("time cost : {}", searchResponse.took());
        HitsMetadata<Account> hitsMetadata = searchResponse.hits();
        logger.info("size : {}", hitsMetadata.total().value());
        List<Hit<Account>> hits = hitsMetadata.hits();
        List<Account> results = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            results.add(hit.source());
        }
        logger.info("account size:{}", results.size());
        logger.info(results.toString());
    }

    @Test
    public void queryAccount2() throws IOException
    {
        String city = "Leming";
        String state = "ND";
        Integer pageNo = 1;
        Integer pageSize = 10;
        Integer[] ageRange = new Integer[]{22,35};


        //查询条件
        AccountSearchQuery accountSearchQuery = new AccountSearchQuery();
        accountSearchQuery.setPageNo(pageNo);
        accountSearchQuery.setPageSize(pageSize);
        accountSearchQuery.setState(state);
        accountSearchQuery.setCity(city);
        accountSearchQuery.setAgeRange(ageRange);

        List<Account> accounts = doQueryAccount(accountSearchQuery);
        logger.info(accounts.toString());

    }
    /**
     * bool组合查询
     * 过滤返回字段
     * 范围查找
     * @throws IOException
     */
    @Test
    public List<Account> doQueryAccount(AccountSearchQuery accountSearchQuery)
            throws IOException
    {
        ElasticsearchClient client = ESSearchTest.getClient();
        SearchResponse<Account> searchResponse = client.search(searchRequestBuilder -> searchRequestBuilder
                        //索引名
                        .index(indexName)
                        //match查询
                        //bool must两个条件必须都符合
                        .query(queryBuilder -> queryBuilder
                                .bool(boolQueryBuilder -> boolQueryBuilder.
                                        //must
                                        must(queryBuilder2 -> queryBuilder2
                                                //match
                                                .match(matchQueryBuilder -> matchQueryBuilder
                                                        .field("state").query(accountSearchQuery.getState())))
                                        //must
                                        .must(queryBuilder3 -> queryBuilder3
                                                //match
                                                .match(matchQueryBuilder2 -> matchQueryBuilder2
                                                        .field("city").query(accountSearchQuery.getCity())))
                                        //must
                                        .must(queryBuilder4 -> queryBuilder4
                                                //range
                                                .range(rangeQueryBuilder -> rangeQueryBuilder
                                                        .field("age")
                                                            .gt(JsonData.of(accountSearchQuery.getAgeRange()[0]))
                                                            .lt(JsonData.of(accountSearchQuery.getAgeRange()[1]))))))
                        //过滤字段
                        .source(sourceConfigBuilder -> sourceConfigBuilder
                                .filter(sourceFilterBuilder  -> sourceFilterBuilder
                                        //包含
                                        .includes("firstname","age", "state", "city")
                                        //不包含
                                        .excludes("email")))
                        //分页
                        .from(parsrePageFrom(accountSearchQuery.getPageNo(), accountSearchQuery.getPageSize()))
                        .size(accountSearchQuery.getPageSize())
                        //排序
                        .sort(sortOptionsBuilder -> sortOptionsBuilder
                                .field(fieldSortBuilder -> fieldSortBuilder
                                        .field("age").order(SortOrder.Asc))),
                Account.class);
        HitsMetadata<Account> hitsMetadata = searchResponse.hits();
        List<Hit<Account>> hits = hitsMetadata.hits();
        List<Account> results = new ArrayList<Account>();
        for (Hit<Account> hit : hits)
        {
            results.add(hit.source());
        }
        return results;
    }

    /**
     * match_all 查询所有
     * match 全文搜索,对输入内容先分词再查询
     * multi_match 多字段匹配以恶搞字段
     * match_phrase 匹配整个查询字符串
     * term 精确匹配,不分词(不太适合text字段的匹配)
     * terms 单个字段多个词查询
     * bool 组合查询
     * fuzzy 模糊查询
     * query_string
     * simple_query_string
     *
     * @throws IOException
     */
    @Test
    public void queryAccount3() throws IOException
    {
        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        //match_all 查询所有
        SearchResponse<Account> s1 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .query(q -> q.matchAll(m -> m)), Account.class);
        logger.info("s1 size:{}, data:{}", s1.hits().total().value(), s1.toString());
        //match 全文搜索,对输入内容先分词再查询
        SearchResponse<Account> s2 = client.search(searchRequestBuilder -> searchRequestBuilder
                .index(indexName)
                        .query(q -> q.match(m -> m.field("address").query("Fleet")))
                , Account.class);
        logger.info("s2 size:{}, data:{}", s2.hits().total().value(), s2.toString());
        //multi_match 多字段匹配
        SearchResponse<Account> s3 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.multiMatch(m -> m.fields("firstname", "lastname").query("Garrett")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s3.hits().total().value(), s3.toString());
        //match_phrase 匹配整个查询字符串,不分词
        SearchResponse<Account> s4 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.matchPhrase(m -> m.field("address").query("396 Grove Place")))
                , Account.class);
        logger.info("s4 size:{}, data:{}", s4.hits().total().value(), s4.toString());
        //term 精确匹配,不分词
        SearchResponse<Account> s5 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.term(t -> t.field("age").value(40)))
                , Account.class);
        logger.info("s5 size:{}, data:{}", s5.hits().total().value(), s5.toString());
        //terms 单个属性多个词查询
        List<FieldValue> values = new ArrayList<>();
        values.add(new FieldValue.Builder().anyValue(JsonData.of(40)).build());
        values.add(new FieldValue.Builder().anyValue(JsonData.of(39)).build());
        SearchResponse<Account> s6 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.terms(t -> t.field("age").terms(ts -> ts.value(values))))
                , Account.class);
        logger.info("s6 size:{}, data:{}", s6.hits().total().value(), s6.toString());
        //bool 组合查询
        SearchResponse<Account> s7 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q
                                .bool(m -> m
                                        .must(q2 -> q2
                                                .match(q3 -> q3.field("age").query(34)))
                                        .mustNot(q3 -> q3
                                                .term(t -> t.
                                                        field("state").value("DC")))
                                        .should(q4 -> q4.matchPhrase(m2 -> m2
                                                .field("address").query("975 Dakota Place")))))
                , Account.class);
        logger.info("s7 size:{}, data:{}", s7.hits().total().value(), s5.toString());
        //fuzzy 模糊查询
        //fuzziness 最大允许编辑距离、prefix_length、max_expansions的等参数请查询相关文档
        //先分词再模糊查询
        SearchResponse<Account> s8 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.match(m -> m.field("firstname").query("Bush").fuzziness("1")))
                , Account.class);
        logger.info("s8 size:{}, data:{}", s8.hits().total().value(), s8.toString());
        //不分词直接模糊查询
        SearchResponse<Account> s9 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.fuzzy(f -> f.field("firstname").fuzziness("1").value("Bush")))
                , Account.class);
        logger.info("s9 size:{}, data:{}", s9.hits().total().value(), s9.toString());
        //query_string
        //类似match
        SearchResponse<Account> s10 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.defaultField("address").query("Fleet")))
                , Account.class);
        logger.info("s10 size:{}, data:{}", s10.hits().total().value(), s10.toString());
        //类似mulit_match
        SearchResponse<Account> s11 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("firstname", "lastname").query("Garrett")))
                , Account.class);
        logger.info("s11 size:{}, data:{}", s11.hits().total().value(), s11.toString());
        //类似match_phrase
        SearchResponse<Account> s12 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.defaultField("address").query("\"396 Grove Place\"")))
                , Account.class);
        logger.info("s12 size:{}, data:{}", s12.hits().total().value(), s12.toString());
        //运算符
        //同时包含Grove和Place
        SearchResponse<Account> s13 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("address").query("Grove AND Place")))
                , Account.class);
        logger.info("s13 size:{}, data:{}", s13.hits().total().value(), s13.toString());
        //同时包含Grove和Place 或者 包含cobbhumphrey@apexia.com
        SearchResponse<Account> s14 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs
                                .fields("address", "email").query("(Grove AND Place) OR cobbhumphrey@apexia.com")))
                , Account.class);
        logger.info("s14 size:{}, data:{}", s14.hits().total().value(), s14.toString());
        //同时包含Grove和Place
        SearchResponse<Account> s15 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.queryString(qs -> qs.fields("address").query("Grove Place")))
                , Account.class);
        logger.info("s15 size:{}, data:{}", s15.hits().total().value(), s15.toString());
        //query_simple_string
        //和query_string相比,不支持AND OR,支持+(AND)、|(OR)、-(NOT)
        SearchResponse<Account> s16 = client.search(searchRequestBuilder -> searchRequestBuilder
                        .index(indexName)
                        .query(q -> q.simpleQueryString(sqs -> sqs.fields("address").query("Grove + Place")))
                , Account.class);
        logger.info("s16 size:{}, data:{}", s16.hits().total().value(), s16.toString());
    }

    /**
     * 聚合查询
     * 计数
     * 求和
     * 最大
     * 最小
     * 平均
     * 分组
     * 去重
     * @throws IOException
     */
    @Test
    public void queryAccount4() throws IOException
    {
        String indexName = "account";
        ElasticsearchClient client = ESSearchTest.getClient();
        //最大、最小、平均、求和
        SearchResponse<Account> s1 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        //最大
                        .aggregations("maxAge", aggregationBuilder -> aggregationBuilder
                                .max(maxAggregationBuilder -> maxAggregationBuilder.field("age")))
                        //最小
                        .aggregations("minAge", aggregationBuilder -> aggregationBuilder
                                .min(minAggregationBuilder -> minAggregationBuilder.field("age")))
                        //平均
                        .aggregations("avgAge", aggregationBuilder -> aggregationBuilder
                                .avg(avgAggregationBuilder -> avgAggregationBuilder.field("age")))
                        //求和
                        .aggregations("sumAge", aggregationBuilder -> aggregationBuilder
                                .sum(sumAggregationBuilder -> sumAggregationBuilder.field("age")))
                , Account.class);
        logger.info("s1 size:{}, data:{}", s1.hits().total().value(), s1.toString());

        Map<String, Aggregate> aggregations = s1.aggregations();
        Aggregate maxAge = aggregations.get("maxAge");
        Aggregate minAge = aggregations.get("minAge");
        Aggregate avgAge = aggregations.get("avgAge");
        Aggregate sumAge = aggregations.get("sumAge");

        logger.info("max age : {}", maxAge.max().value());
        logger.info("min age : {}", minAge.min().value());
        logger.info("avg age : {}", avgAge.avg().value());
        logger.info("sum age : {}", sumAge.sum().value());

        //计数、统计、最大、最小、平均、求和
        SearchResponse<Account> s2 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("accountNumberStat", aggregationBuilder -> aggregationBuilder
                                //统计
                                .stats(statsAggregationBuilder -> statsAggregationBuilder.field("account_number")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s2.hits().total().value(), s2.toString());
        Aggregate ans = s2.aggregations().get("accountNumberStat");
        logger.info("account number counts:{}", ans.stats().count());
        logger.info("account number sum:{}", ans.stats().sum());
        logger.info("account number max:{}", ans.stats().max());
        logger.info("account number min:{}", ans.stats().min());
        logger.info("account number avg:{}", ans.stats().avg());
        //去重
        SearchResponse<Account> s3 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("cityDistinct", aggregationBuilder -> aggregationBuilder
                                //去重
                                .cardinality(cardinalityAggregationBuilder  -> cardinalityAggregationBuilder
                                        .field("city")))
                , Account.class);
        logger.info("s3 size:{}, data:{}", s3.hits().total().value(), s3.toString());
        Aggregate aggregation = s3.aggregations().get("cityDistinct");
        CardinalityAggregate cityDistinct = aggregation.cardinality();
        logger.info("distinct city number:{}", cityDistinct.value());
        //分组
        SearchResponse<Map> s4 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .aggregations("stateAgg", aggregationBuilder -> aggregationBuilder
                                .terms(termsAggregation -> termsAggregation
                                        .field("state")))
                , Map.class);
        logger.info("s4 size:{}, data:{}", s4.hits().total().value(), s4.toString());
        List<StringTermsBucket> bs = s4.aggregations().get("stateAgg").sterms().buckets().array();
        for (StringTermsBucket b : bs)
        {
            logger.info("key:{}, value:{}", b.key().stringValue(), b.docCount());
        }
        /**
         * 做一个复杂的统计
         * 功能:
         * 查询年龄在20-30岁之间、地址包含”Street“的数据
         * 聚合查询数据
         * 根据state(假设这个字段代表地域,州名称)分组
         * 统计每个州的最高、最低、平均工资和总人数(一条数据假设为一个人)
         * 统计每个州以男女为区分,每种性别的最高、最低、平均工资和总人数(一条数据假设为一个人)
         */
        SearchResponse<Map> s5 = client.search(
                searchRequestBuilder -> searchRequestBuilder.index(indexName)
                        .query(queryBuilder -> queryBuilder
                                .bool(boolQueryBuilder -> boolQueryBuilder
                                        .must(queryBuilder2 -> queryBuilder2
                                                //地址包含”Street“的数据
                                                .match(matchQueryBuilder -> matchQueryBuilder
                                                        .field("address").query("Street")))
                                        .must(queryBuilder3 -> queryBuilder3
                                                //年龄在20-30岁之间
                                                .range(rangeBuilder -> rangeBuilder
                                                        .field("age").lt(JsonData.of(30)).gt(JsonData.of(20))))))
                        //聚合
                        .aggregations("stateAgg", aggregationBuilder -> aggregationBuilder
                                //根据state(州)分组
                                .terms(termsAggregation -> termsAggregation.field("state"))
                                //州最高工资
                                .aggregations("stateAccountMaxAgg", aggregationBuilder2 -> aggregationBuilder2.max(queryBuilder4 -> queryBuilder4.field("account_number")))
                                //州最低工资
                                .aggregations("stateAccountMinAgg", aggregationBuilder3 -> aggregationBuilder3.min(queryBuilder5 -> queryBuilder5.field("account_number")))
                                //州平均工资
                                .aggregations("stateAccountAvgAgg", aggregationBuilder4 -> aggregationBuilder4.avg(queryBuilder6 -> queryBuilder6.field("account_number")))
                                //再次聚合
                                .aggregations("genderAgg", aggregationBuilder5 -> aggregationBuilder5
                                        //根据性别分组
                                        .terms(termsAggregation2 -> termsAggregation2.field("gender"))
                                        //当前性别最高薪资
                                        .aggregations("GenderAccountMaxAgg", aggregationBuilder6 -> aggregationBuilder6
                                                .max(queryBuilder7 -> queryBuilder7.field("account_number")))
                                        .aggregations("GenderAccountMinAgg", aggregationBuilder7 -> aggregationBuilder7
                                                .min(queryBuilder8 -> queryBuilder8.field("account_number")))
                                        .aggregations("GenderAccountAvgAgg", aggregationBuilder8 -> aggregationBuilder8
                                                .avg(queryBuilder9 -> queryBuilder9.field("account_number")))))
                , Map.class);
        logger.info("s4 size:{}, data:{}", s5.hits().total().value(), s5.toString());
        List<StringTermsBucket> buckets = s5.aggregations().get("stateAgg").sterms().buckets().array();
        for (StringTermsBucket b : buckets)
        {
            StringBuilder s = new StringBuilder();
            s.append("State ").append(b.key().stringValue())
                    .append(" has ").append(b.docCount()).append(" people. ")
                    .append("Account max:").append(b.aggregations().get("stateAccountMaxAgg").max().value())
                    .append(", min:").append(b.aggregations().get("stateAccountMinAgg").min().value())
                    .append(", avg:").append(b.aggregations().get("stateAccountAvgAgg").avg().value())
                    .append(". ");
            List<StringTermsBucket> list = b.aggregations().get("genderAgg").sterms().buckets().array();
            for (StringTermsBucket l : list)
            {
                s.append("Gender: ").append(l.key().stringValue())
                        .append(" , account max:").append(l.aggregations().get("GenderAccountMaxAgg").max().value())
                        .append(",min:").append(l.aggregations().get("GenderAccountMinAgg").min().value())
                        .append(",avg:").append(l.aggregations().get("GenderAccountAvgAgg").avg().value())
                        .append(".    ");
            }
            logger.info(s.toString());

        }
    }

    private Integer parsrePageFrom(Integer pageNo, Integer pageSize)
    {
        if (pageNo != null && pageSize != null && pageSize.intValue() > 0)
        {
            return (pageNo.intValue() > 0 ? pageNo.intValue() - 1 : 0)
                    * pageSize.intValue();

        }
        return 0;
    }

    public class AccountSearchQuery
    {
        public Integer pageNo;
        public Integer pageSize;

        public String city;

        public String state;

        public Integer[] ageRange;

        public Integer getPageNo()
        {
            return pageNo;
        }

        public void setPageNo(Integer pageNo)
        {
            this.pageNo = pageNo;
        }

        public Integer getPageSize()
        {
            return pageSize;
        }

        public void setPageSize(Integer pageSize)
        {
            this.pageSize = pageSize;
        }

        public String getCity()
        {
            return city;
        }

        public void setCity(String city)
        {
            this.city = city;
        }

        public String getState()
        {
            return state;
        }

        public void setState(String state)
        {
            this.state = state;
        }

        public Integer[] getAgeRange()
        {
            return ageRange;
        }

        public void setAgeRange(Integer[] ageRange)
        {
            this.ageRange = ageRange;
        }
    }

    public static class Account
    {
        private Integer account_number;

        private Integer balance;

        private String firstname;

        private String lastname;

        private Integer age;

        private String gender;

        private String address;

        private String employer;

        private String email;

        private String city;

        private String state;

        public Account()
        {
        }

        public Account(Integer account_number, Integer balance,
                String firstname, String lastname, Integer age, String gender,
                String address, String employer, String email, String city,
                String state)
        {
            this.account_number = account_number;
            this.balance = balance;
            this.firstname = firstname;
            this.lastname = lastname;
            this.age = age;
            this.gender = gender;
            this.address = address;
            this.employer = employer;
            this.email = email;
            this.city = city;
            this.state = state;
        }

        public void setAccount_number(Integer account_number){
            this.account_number = account_number;
        }
        public int getAccount_number(){
            return this.account_number;
        }
        public void setBalance(Integer balance){
            this.balance = balance;
        }
        public int getBalance(){
            return this.balance;
        }
        public void setFirstname(String firstname){
            this.firstname = firstname;
        }
        public String getFirstname(){
            return this.firstname;
        }
        public void setLastname(String lastname){
            this.lastname = lastname;
        }
        public String getLastname(){
            return this.lastname;
        }
        public void setAge(Integer age){
            this.age = age;
        }
        public int getAge(){
            return this.age;
        }
        public void setGender(String gender){
            this.gender = gender;
        }
        public String getGender(){
            return this.gender;
        }
        public void setAddress(String address){
            this.address = address;
        }
        public String getAddress(){
            return this.address;
        }
        public void setEmployer(String employer){
            this.employer = employer;
        }
        public String getEmployer(){
            return this.employer;
        }
        public void setEmail(String email){
            this.email = email;
        }
        public String getEmail(){
            return this.email;
        }
        public void setCity(String city){
            this.city = city;
        }
        public String getCity(){
            return this.city;
        }
        public void setState(String state){
            this.state = state;
        }
        public String getState(){
            return this.state;
        }

        @Override public String toString()
        {
            return "Account{" + "account_number=" + account_number
                    + ", balance=" + balance + ", firstname='" + firstname
                    + '\'' + ", lastname='" + lastname + '\'' + ", age=" + age
                    + ", gender='" + gender + '\'' + ", address='" + address
                    + '\'' + ", employer='" + employer + '\'' + ", email='"
                    + email + '\'' + ", city='" + city + '\'' + ", state='"
                    + state + '\'' + '}';
        }
    }

}
文章来自个人专栏
ELK
1 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0