Elasticsearch去重查询

at 4年前  ca elasticsearch  pv 4026  by touch  

1、前言

最近遇到一个es数据查询去重的需求,我这边需要的是获取去重后的结果,查询官网资料和各位大神的经验后,总结一下elasticsearch去重查询功能。


2、Elasticsearch去重功能

关系型数据库中,比如MySQL,可以通过distinct进行去重,一般分为两种:

1 ) 统计去重后的数量

select distinct(count(1)) from test;

2 ) 获取去重后的结果

select distinct name,sex from person;

test,person为对应的表名。


Elasticsearch类似功能的实现方式

1 ) es查询结果进行去重计数


es的去重计数工卡可以通过es的聚合功能+Cardinality聚合函数来实现


2 ) es查询结果去重后显示


去重显示有两种方式:


(1) 使用字段聚合+top_hits聚合方式


(2)使用collapse折叠功能(5.3后版本提供)


我这里使用的es是2.4的版本,JavaApi使用的是第一种方式,5.x,6.x的版本也可以使用


3、DSL源码

可以通过es head插件来进行查询测试,user_onoffline_log是有的索引名字,uid是其中的一个字段,uid_aggs是聚合的名字,可以随便自定义。


1)统计去重数目。

POST user_onoffline_log/_search
{
  "query": {
    "match_all": {}
  },
  "size": 0,
  "aggs": {
    "uid_aggs": {
      "cardinality": {
        "field": "uid"
      }
    }
  }
}

结果:


{
  "took": 565,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 43326369,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "uid_aggs": {
      "value": 12243
    }
  }
}

可以看到uid字段去重后的计数值为12243。


2)返回去重内容

方式一:top_hits聚合


POST /user_onoffline_log/
{
  "query": {
    "match_all": {}
  },
  "aggs": {
    "uid_aggs": {
      "terms": {
        "field": "uid",
        "size": 1
      },
      "aggs": {
        "uid_top": {
          "top_hits": {
            "sort": [
              {
                "uid": {
                  "order": "desc"
                }
              }
            ],
            "size": 1
          }
        }
      }
    }
  },
  "size": 0
}

可以通过size设置显示的数量,上面aggs的size设置的1,结果:


{
    "took":2318,
    "timed_out":false,
    "_shards":{
        "total":200,
        "successful":5,
        "failed":195,
        "failures":[
            {
                "shard":0,
                "index":"stamgr-logstash-2018.03.06",
                "node":"kUbpfLDMRLuAnloYBBRGng",
                "reason":{
                    "type":"search_parse_exception",
                    "reason":"No mapping found for [uid] in order to sort on"
                }
            }
        ]
    },
    "hits":{
        "total":43326369,
        "max_score":0,
        "hits":[

        ]
    },
    "aggregations":{
        "uid_aggs":{
            "doc_count_error_upper_bound":1909,
            "sum_other_doc_count":2174528,
            "buckets":[
                {
                    "key":"",
                    "doc_count":41151839,
                    "uid_top":{
                        "hits":{
                            "total":41151839,
                            "max_score":null,
                            "hits":[
                                {
                                    "_index":"user_onoffline_log",
                                    "_type":"logs",
                                    "_id":"AWYDAu4otdWc46rphdZz",
                                    "_score":null,
                                    "_source":{
                                        "uid":"",
                                        "ip":"",
                                        "mac":"00:5a:39:ec:1c:e4",
                                        "ap_serial_id_online":"219801A0REM173004134",
                                        "ap_mac_online":"9c:06:1b:a8:18:e0",
                                        "ap_name_online":"lFkFq_xiangyundao_sihailujiaokou_W-E_AP02",
                                        "location_online":"祥云道与四海路交口由西向东第2点",
                                        "zone_online":"祥云道",
                                        "ap_serial_id_offline":"219801A0REM173004692",
                                        "ap_mac_offline":"9c:06:1b:a8:00:00",
                                        "ap_name_offline":"lFkFq_xiangyundao_sihailujiaokou_W-E_AP03",
                                        "location_offline":"祥云道与四海路交口由西向东第3点",
                                        "zone_offline":"祥云道",
                                        "area":"开发区",
                                        "area_id":"001",
                                        "province":"",
                                        "city":"",
                                        "corp":"",
                                        "type":"3",
                                        "online_time":"201809222310",
                                        "offline_time":"201809230440",
                                        "duration":330
                                    },
                                    "sort":[
                                        ""
                                    ]
                                }
                            ]
                        }
                    }
                }
            ]
        }
    }
}

方式二:折叠


POST /user_onoffline_log/
{
    "query":{
        "match_all":{

        }
    },
    "collapse":{
        "field":"uid"
    }
}

方式二较方式一:


简化;

性能好很多。

4、Java实现

1)统计去重数目

public class EsTest {
    public static void main(String[] args) {
        Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch") // 设置集群名
                .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
                .build();
        TransportClient client = TransportClient.builder().settings(settings).build()
                .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("101.10.32.1", 9300)));
                
         CardinalityBuilder cardinalityBuilder = AggregationBuilders.cardinality("uid_aggs").field("uid");


        SearchRequestBuilder request = client.prepareSearch("user_onoffline_log")
                .setTypes("logs")
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.boolQuery()
                        .must(QueryBuilders.termQuery("uid", "")))
                .addAggregation(cardinalityBuilder)
                .setSize(1);


        SearchResponse response = request.execute().actionGet();

        List<Aggregation> aggregationList = response.getAggregations().asList();

        for (Aggregation aggregation : aggregationList) {
            System.out.println(aggregation.getProperty("value"));
        }
    }
}


2)返回去重内容

public class EsTest {
    public static void main(String[] args) {
        Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch") // 设置集群名
                .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上
                .build();
        TransportClient client = TransportClient.builder().settings(settings).build()
                .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("101.10.32.1", 9300)));
    
        AggregationBuilder aggregationBuilder = AggregationBuilders
                .terms("uid_aggs").field("uid").size(10000)
                .subAggregation(AggregationBuilders.topHits("uid_top")
                        .addSort("offline_time", SortOrder.DESC)
                        .setSize(1));
        

        SearchRequestBuilder request = client.prepareSearch("user_onoffline_log")
                .setTypes("logs")
                .setSearchType(SearchType.QUERY_THEN_FETCH)
                .setQuery(QueryBuilders.boolQuery()
                        .must(QueryBuilders.termQuery("uid", "")))
                .addAggregation(aggregationBuilder)
                .setSize(1);


        SearchResponse response = request.execute().actionGet();
        Terms genders = response.getAggregations().get("uid_aggs");
        for (Terms.Bucket entry : genders.getBuckets()) {
            TopHits top = entry.getAggregations().get("uid_top");
            for (SearchHit hit : top.getHits()) {
                System.out.println(hit.getSource());
            }
        }
    }
}


版权声明

本文仅代表作者观点,不代表码农殇立场。
本文系作者授权码农殇发表,未经许可,不得转载。

 

扫一扫在手机阅读、分享本文

已有0条评论
您是本站第12434名访客 今日有0篇新文章 当前在线 17 人