Elasticsearch去重查询
at 4年前 ca elasticsearch pv 4026 by touch
1、前言
最近遇到一个es数据查询去重的需求,我这边需要的是获取去重后的结果,查询官网资料和各位大神的经验后,总结一下elasticsearch去重查询功能。
2、Elasticsearch去重功能
关系型数据库中,比如MySQL,可以通过distinct进行去重,一般分为两种:
1 ) 统计去重后的数量
select distinct(count(1)) from test;
2 ) 获取去重后的结果
select distinct name,sex from person;
test,person为对应的表名。
Elasticsearch类似功能的实现方式
1 ) es查询结果进行去重计数
es的去重计数工卡可以通过es的聚合功能+Cardinality聚合函数来实现
2 ) es查询结果去重后显示
去重显示有两种方式:
(1) 使用字段聚合+top_hits聚合方式
(2)使用collapse折叠功能(5.3后版本提供)
我这里使用的es是2.4的版本,JavaApi使用的是第一种方式,5.x,6.x的版本也可以使用
3、DSL源码
可以通过es head插件来进行查询测试,user_onoffline_log是有的索引名字,uid是其中的一个字段,uid_aggs是聚合的名字,可以随便自定义。
1)统计去重数目。
POST user_onoffline_log/_search { "query": { "match_all": {} }, "size": 0, "aggs": { "uid_aggs": { "cardinality": { "field": "uid" } } } }
结果:
{ "took": 565, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 43326369, "max_score": 0, "hits": [] }, "aggregations": { "uid_aggs": { "value": 12243 } } }
可以看到uid字段去重后的计数值为12243。
2)返回去重内容
方式一:top_hits聚合
POST /user_onoffline_log/ { "query": { "match_all": {} }, "aggs": { "uid_aggs": { "terms": { "field": "uid", "size": 1 }, "aggs": { "uid_top": { "top_hits": { "sort": [ { "uid": { "order": "desc" } } ], "size": 1 } } } } }, "size": 0 }
可以通过size设置显示的数量,上面aggs的size设置的1,结果:
{ "took":2318, "timed_out":false, "_shards":{ "total":200, "successful":5, "failed":195, "failures":[ { "shard":0, "index":"stamgr-logstash-2018.03.06", "node":"kUbpfLDMRLuAnloYBBRGng", "reason":{ "type":"search_parse_exception", "reason":"No mapping found for [uid] in order to sort on" } } ] }, "hits":{ "total":43326369, "max_score":0, "hits":[ ] }, "aggregations":{ "uid_aggs":{ "doc_count_error_upper_bound":1909, "sum_other_doc_count":2174528, "buckets":[ { "key":"", "doc_count":41151839, "uid_top":{ "hits":{ "total":41151839, "max_score":null, "hits":[ { "_index":"user_onoffline_log", "_type":"logs", "_id":"AWYDAu4otdWc46rphdZz", "_score":null, "_source":{ "uid":"", "ip":"", "mac":"00:5a:39:ec:1c:e4", "ap_serial_id_online":"219801A0REM173004134", "ap_mac_online":"9c:06:1b:a8:18:e0", "ap_name_online":"lFkFq_xiangyundao_sihailujiaokou_W-E_AP02", "location_online":"祥云道与四海路交口由西向东第2点", "zone_online":"祥云道", "ap_serial_id_offline":"219801A0REM173004692", "ap_mac_offline":"9c:06:1b:a8:00:00", "ap_name_offline":"lFkFq_xiangyundao_sihailujiaokou_W-E_AP03", "location_offline":"祥云道与四海路交口由西向东第3点", "zone_offline":"祥云道", "area":"开发区", "area_id":"001", "province":"", "city":"", "corp":"", "type":"3", "online_time":"201809222310", "offline_time":"201809230440", "duration":330 }, "sort":[ "" ] } ] } } } ] } } }
方式二:折叠
POST /user_onoffline_log/ { "query":{ "match_all":{ } }, "collapse":{ "field":"uid" } }
方式二较方式一:
简化;
性能好很多。
4、Java实现
1)统计去重数目
public class EsTest { public static void main(String[] args) { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch") // 设置集群名 .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 .build(); TransportClient client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("101.10.32.1", 9300))); CardinalityBuilder cardinalityBuilder = AggregationBuilders.cardinality("uid_aggs").field("uid"); SearchRequestBuilder request = client.prepareSearch("user_onoffline_log") .setTypes("logs") .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("uid", ""))) .addAggregation(cardinalityBuilder) .setSize(1); SearchResponse response = request.execute().actionGet(); List<Aggregation> aggregationList = response.getAggregations().asList(); for (Aggregation aggregation : aggregationList) { System.out.println(aggregation.getProperty("value")); } } }
2)返回去重内容
public class EsTest { public static void main(String[] args) { Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch") // 设置集群名 .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 .build(); TransportClient client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("101.10.32.1", 9300))); AggregationBuilder aggregationBuilder = AggregationBuilders .terms("uid_aggs").field("uid").size(10000) .subAggregation(AggregationBuilders.topHits("uid_top") .addSort("offline_time", SortOrder.DESC) .setSize(1)); SearchRequestBuilder request = client.prepareSearch("user_onoffline_log") .setTypes("logs") .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("uid", ""))) .addAggregation(aggregationBuilder) .setSize(1); SearchResponse response = request.execute().actionGet(); Terms genders = response.getAggregations().get("uid_aggs"); for (Terms.Bucket entry : genders.getBuckets()) { TopHits top = entry.getAggregations().get("uid_top"); for (SearchHit hit : top.getHits()) { System.out.println(hit.getSource()); } } } }
版权声明
本文仅代表作者观点,不代表码农殇立场。
本文系作者授权码农殇发表,未经许可,不得转载。
已有0条评论