ElasticSearch 聚合筛选,类似SQL里面的having

ElasticSearch 聚合筛选,类似SQL里面的having

背景

我们在实际业务场景中会遇到聚合筛选的需求,需要先分组然后聚合,再通过聚合的结果进行筛选,关系型数据库中有having或者子查询来实现,ES中key使用 bucket_selector 来实现此功能

实际业务场景  sapv博客之家

需要找出下单次数大于等于2单,并且平均下单金额大于等于100的客户

  • 在关系型数据库中对应的SQL语句
SELECT 
    userId,
    AVG(amount) avgAmount,
    count(*) orderCount
FROM type_order
GROUP by userId
HAVING avgAmount >= 100 and orderCount >=2

  • ES 的 query
GET index_test/type_order/_search
{
  "size": 0,
  "aggs": {
    "groupUserId": {
      "terms": {
        "field": "userId"
      },
      "aggs": {
        "avgAmount": {
          "avg": {
            "field": "amount"
          }
        },
        "having": {
          "bucket_selector": {
            "buckets_path": {
              "orderCount": "_count",
              "avgAmount": "avgAmount"
            },
            "script": {
              "source": "params.avgAmount >= 100 && params.orderCount >=2 "
            }
          }
        }
      }
    }
  }
}

  • 返回结果
{
  "took": 16,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 4,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "groupUserId": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": 1000,
          "doc_count": 2,
          "avgAmount": {
            "value": 275
          }
        }
      ]
    }
  }
}


  • 在Java Api 中的实现方式
String tremsAlias = "userIdGroup";

//先分组,如果不指定size,默认10条,这里可以传个int最大值 2147483647,一次取所有数据
TermsAggregationBuilder termsAgg = AggregationBuilders.terms(tremsAlias).field("userId").size(Integer.MAX_VALUE).order(Terms.Order.term(true));
//聚合,count为自带的
termsAgg.subAggregation(AggregationBuilders.avg("avgAmount").field("amount"));
//声明BucketPath,用于后面的bucket筛选
Map<String, String> bucketsPathsMap = new HashMap<>(8);
bucketsPathsMap.put("orderCount", "_count");
bucketsPathsMap.put("avgAmount", "avgAmount");
//设置脚本
Script script = new Script("params.avgAmount >= 100 && params.orderCount >=2");

//构建bucket选择器
BucketSelectorPipelineAggregationBuilder bs =
        PipelineAggregatorBuilders.bucketSelector("having", bucketsPathsMap, script);

termsAgg.subAggregation(bs);

SearchRequestBuilder sb = client.prepareSearch("index_test").setTypes("type_order");
SearchResponse sr = sb.setSize(0).addAggregation(termsAgg).execute().actionGet();
System.out.println("查询Query:");
System.out.println(sb);

//获取聚合筛选的结果数据
LongTerms lt = sr.getAggregations().get(tremsAlias);
List<LongTerms.Bucket> buckets = lt.getBuckets();
for (int i = 0; i < buckets.size(); i++) {
    LongTerms.Bucket bucket = buckets.get(i);
    System.out.println("-------------------------");
    System.out.println(bucket.getKey());
    System.out.println("count = " + bucket.getDocCount());
    List<Aggregation> list = bucket.getAggregations().asList();
    for (Aggregation agg : list) {
        if (agg instanceof InternalAvg) {
            InternalAvg ia = bucket.getAggregations().get("avgAmount");
            System.out.println("avgAmount = " + ia.getValue());
        }
    }
    System.out.println("-------------------------");

}


  • 发表于 2018-09-27 17:49
  • 阅读 ( 201 )
  • 分类:大数据

0 条评论

请先 登录 后评论
不写代码的码农
张鹏

大数据工程师

95 篇文章

作家榜 »

  1. 张鹏 95 文章
  2. 0 文章
  3. 赵科 0 文章
  4. 王孖珺397954227 0 文章