Elasticsearch 中如何巧妙地使用聚合函数达到数据库中having的效果

elasticsearch高级聚合对于初级来说,确实有些复杂,但是经历过之后你反而会发现其实并没有那么复杂,只是自己接触的太少了,努力学习吧

在现实开发中难免会遇到一些业务场景,通过聚合得出相应的频次并进行筛选

1.使用 minDocCount  直接上代码,大家可自行根据业务场景更改

[java] view plain copy
  1. //正确答案  
  2. SearchRequestBuilder search = transportlient.prepareSearch("bigdata_idx_2").setTypes("captureCompare");  
  3. FilterAggregationBuilder sub= AggregationBuilders.filter("channel_longitudeC").filter(QueryBuilders.rangeQuery("fcmp_time").from(startTime).to(endTime));  
  4. //分组字段是id,排序由多个字段排序组成  
  5. TermsBuilder tb= AggregationBuilders.terms("fcmp_fobj_id").field("fcmp_fobj_id").valueType(Terms.ValueType.STRING).order(Terms.Order.compound(  
  6. Terms.Order.aggregation("channel_longitudeC",false)//先按count,降序排  
  7. //如果count相等情况下,使用code的和排序  
  8. ));  
  9. //求和字段1  
  10. ValueCountBuilder sb= AggregationBuilders.count("channel_longitudeC");  
  11. tb.subAggregation(sb).minDocCount(400);//添加到分组聚合请求中  
  12.   
  13. //将分组聚合请求插入到主请求体重  
  14. // search.setPostFilter()  
  15. search.addAggregation(tb);  


2.稍微复杂些,还有另外一种场景,就是我聚合的同时,需要把其他相应的字段信息也同时返回出来 Top Hits Aggregation

类似SQL : select *,count(*) from XXX group by a ......

[java] view plain copy
  1. SearchResponse response = null;  
  2.         SearchRequestBuilder responsebuilder = transportlient.prepareSearch("syrk_bigdata_capturecmp_passer_idx")  
  3.                 .setTypes("captureCompare").setFrom(0).setSize(100000);  
  4.         AggregationBuilder aggregation = AggregationBuilders  
  5.                 .terms("agg")  
  6.                 .field("idNumb")  
  7.                 .subAggregation(  
  8.                         AggregationBuilders.topHits("top").setFrom(0)  
  9.                                 .setSize(1)).size(100000);  
  10.         response = responsebuilder.setQuery(QueryBuilders.boolQuery()  
  11.                 .must(QueryBuilders.rangeQuery("fcapTime").from(Long.valueOf(startTime)).to(Long.valueOf(endTime))))  
  12.                 .addSort("idNumb", SortOrder.ASC)  
  13.                 .addAggregation(aggregation)// .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)  
  14.                 .setExplain(true).execute().actionGet();  
  15.         <strong>SearchHits hits = response.getHits();//最后取结果时不要使用此hits</strong>  
  16.         Terms agg = response.getAggregations().get("agg");  
  17.         long end = System.currentTimeMillis();  
  18.         System.out.println("ES run time: " + (end - start) + "ms");  
  19.         /**插入之前首先清除当天数据,以免重复添加**/  
  20.         SyrkRegionFcapperPasserStatistics temp = new SyrkRegionFcapperPasserStatistics();  
  21.         temp.setDate(Long.valueOf(startTime));  
  22.         try{  
  23.             syrkRegionFcapperPasserStatisticsService.deletePasser(temp);  
  24.             for (Terms.Bucket entry : agg.getBuckets()) {  
  25.                 String key = (String) entry.getKey(); // bucket key  
  26.                 long docCount = entry.getDocCount(); // Doc count  
  27.                   
  28.                 // We ask for top_hits for each bucket  
  29.                 TopHits topHits = entry.getAggregations().get("top");  
  30.                 for (SearchHit hit : topHits.getHits().getHits()) {  
  31.                     compareUuid= (String) hit.getSource().get("idNumb");  
  32.                       
  33.                 }  
  34.                 /** 读取数据写入mysql **/  
  35.             }  
  36.             logger.info("All Analysis Data has insert : date is "+startTime);  
  37.         }catch (Exception e){  
  38.             logger.info("Analysis Result Data failed ,date is "+startTime);  
  39.         }  



聚合后的总数取相应的 docCount  其他字段信息从hits 中获取

切记,不要取最外层的hits ,因为外层的hits 和聚合的hits数量会不一致,遍历取回造成数据不一致

  • 发表于 2018-04-30 08:47
  • 阅读 ( 620 )
  • 分类:大数据

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

94 篇文章

作家榜 »

  1. 张鹏 94 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章