`

Elasticsearch聚合查询案例分享

阅读更多
Elasticsearch聚合查询案例分享

1.案例介绍
统计特定时间范围内每个应用的总访问量、访问成功数、访问失败数,每个应用请求响应时间分段统计(1秒内,1-3秒,3-5秒,5秒以上

2.准备工作

参考文档《高性能elasticsearch ORM开发库使用介绍》中的第1章节和第2章节,在自己的工程中导入bboss es依赖包和配置es参数

3.定义统计dsl
在源码目录下新建文件esmapper/estrace/ESTracesMapper.xml,内容如下

<properties>
    <!--
    应用汇总统计:总访问量,成功数,失败数
   bboss es dao通过名称applicationSumStatic引用脚本
    -->
    <property name="applicationSumStatic">
        <![CDATA[
        {
            "query": {
                "bool": {
                    "filter": [
                        #if($channelApplications && $channelApplications.size() > 0)
                        {
                            "terms": {
                                "applicationName.keyword": [
                                #foreach($application in $channelApplications)
                                   #if($velocityCount > 0),#end $application.applicationName
                                #end
                                ]
                            }
                        },
                        #end
                        {"range": {
                                "startTime": {
                                    "gte": #[startTime],##统计开始时间
                                    "lt": #[endTime]  ##统计截止时间
                                }
                            }
                        }
                    ]
                }
            },
            "size":0,
            "aggs": {
                "applicationsums": {
                      "terms": {
                        "field": "applicationName.keyword",##按应用名称进行统计计数
                        "size":10000
                      },
                      "aggs":{
                            "successsums" : {
                                "terms" : {
                                    "field" : "err" ##按err标识统计每个应用的成功数和失败数,0标识成功,1标识失败
                                }
                            },
                            "elapsed_ranges" : {
                                "range" : {
                                    "field" : "elapsed", ##按响应时间分段统计
                                    "keyed" : true,
                                    "ranges" : [
                                        { "key" : "1秒", "to" : 1000 },
                                        { "key" : "3秒", "from" : 1000, "to" : 3000 },
                                        { "key" : "5秒", "from" : 3000, "to" : 5000 },
                                        { "key" : "5秒以上", "from" : 5000 }
                                    ]
                                }
                            }
                      }
                }
            }
        }
        ]]>
    </property>
</properties>

4.编写统计dao及统计方法
public class TraceESDao {    
    public List<ApplicationStatic> getApplicationSumStatic(TraceExtraCriteria traceExtraCriteria){
    	init();
    	//返回json统计报文,调试用,一遍根据json报文组装统计结果列表
//		String response = clientUtil.executeRequest("trace-*/_search",
//                                  "applicationSumStatic",traceExtraCriteria);
		//根据条件进行统计,在对象traceExtraCriteria中指定开始时间和结束时间
		MapRestResponse restResponse = clientUtil.search("trace-*/_search",
				                      "applicationSumStatic",traceExtraCriteria);

		//组装统计结果
		//获取应用统计列表,包含每个应用的名称、总访问量以及成功数和失败数
		List<Map<String,Object>> appstatics = (List<Map<String,Object>>)restResponse.getAggBuckets("applicationsums");
		if(appstatics != null && appstatics.size() > 0) {
			List<ApplicationStatic> applicationStatics = new ArrayList<ApplicationStatic>(appstatics.size());
			ApplicationStatic applicationStatic = null;
			for (int i = 0; i < appstatics.size(); i++) {
				applicationStatic = new ApplicationStatic();
				Map<String, Object> map = appstatics.get(i);
				//应用名称
				String appName = (String) map.get("key");
				applicationStatic.setApplicationName(appName);
				//应用总访问量
				Long totalsize = ResultUtil.longValue( map.get("doc_count"),0l);
				applicationStatic.setTotalSize(totalsize);
				//获取成功数和失败数
				List<Map<String, Object>> appstatic = (List<Map<String, Object>>)ResultUtil.getAggBuckets(map, "successsums");

				/**
				 "buckets": [
				 {
				 "key": 0,
				 "doc_count": 30
				 }
				 ]
				 */
				//key 0
				Long success = 0l;//成功数
				Long failed = 0l;//失败数
				for (int j = 0; j < appstatic.size(); j++) {
					Map<String, Object> stats = appstatic.get(j);
					Integer key = (Integer) stats.get("key");//成功和错误标识
					if (key == 0)//成功
						success = ResultUtil.longValue( stats.get("doc_count"),0l);
					else if (key == 1)//失败
						failed = ResultUtil.longValue( stats.get("doc_count"),0l);
				}
				applicationStatic.setSuccessCount(success);
				applicationStatic.setFailCount(failed);
				List<ApplicationPeriodStatic> applicationPeriodStatics = new ArrayList<ApplicationPeriodStatic>(4);
				ApplicationPeriodStatic applicationPeriodStatic = null;
				//获取响应时间分段统计信息
				Map<String, Map<String, Object>> appPeriodstatic = (Map<String, Map<String, Object>>)ResultUtil.getAggBuckets(map, "elapsed_ranges");
				//1秒
				Map<String, Object> period = appPeriodstatic.get("1秒");
				applicationPeriodStatic = new ApplicationPeriodStatic();
				applicationPeriodStatic.setPeriod("1秒");
				applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
				applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),1000));
				applicationPeriodStatics.add(applicationPeriodStatic);

				//3秒
				period = appPeriodstatic.get("3秒");
				applicationPeriodStatic = new ApplicationPeriodStatic();
				applicationPeriodStatic.setPeriod("3秒");
				applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
				applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),1000));
				applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),3000));
				applicationPeriodStatics.add(applicationPeriodStatic);

				//5秒
				period = appPeriodstatic.get("5秒");
				applicationPeriodStatic = new ApplicationPeriodStatic();
				applicationPeriodStatic.setPeriod("5秒");
				applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
				applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),3000));
				applicationPeriodStatic.setTo(ResultUtil.intValue(period.get("to"),5000));
				applicationPeriodStatics.add(applicationPeriodStatic);

				//5秒以上
				period = appPeriodstatic.get("5秒以上");
				applicationPeriodStatic = new ApplicationPeriodStatic();
				applicationPeriodStatic.setPeriod("5秒以上");
				applicationPeriodStatic.setDocCount(ResultUtil.longValue(period.get("doc_count"),0l));
				applicationPeriodStatic.setFrom(ResultUtil.intValue(period.get("from"),5000));
				applicationPeriodStatics.add(applicationPeriodStatic);

				applicationStatic.setApplicationPeriodStatics(applicationPeriodStatics);
				applicationStatics.add(applicationStatic);

			}
			//返回统计结果
			return applicationStatics;
		}
		return null;
	}
}

5.执行测试用例
@Test
	public void testAppliationstaticList(){
		TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
		traceExtraCriteria.setStartTime(1516304868072l);//以long方式设置统计开始时间,Date的getTime方法获取
		traceExtraCriteria.setEndTime(1516349516377l);//以long方式设置统计截止时间,Date的getTime方法获取
		TraceESDao traceESDao = new TraceESDao();//定义dao组件
		List<ApplicationStatic> applicationStatics = traceESDao.getApplicationSumStatic(traceExtraCriteria);
		System.out.println(applicationStatics.size());
	}


6.获取元数据信息的测试方法
@Test
	public void testAppStatic(){
		TraceExtraCriteria traceExtraCriteria = new TraceExtraCriteria();
		traceExtraCriteria.setStartTime(1516304868072l);
		traceExtraCriteria.setEndTime(1516349516377l);
		ClientInterface clientUtil = ElasticSearchHelper.getConfigRestClientUtil("esmapper/estrace/ESTracesMapper.xml");
		//通过下面的方法先得到查询的json报文,然后再通过MapRestResponse查询遍历结果,调试的时候打开String response的注释
		//String response = clientUtil.executeRequest("trace-*/_search","applicationSumStatic",traceExtraCriteria);
		//System.out.println(response);
		MapRestResponse restResponse = clientUtil.search("trace-*/_search","applicationSumStatic",traceExtraCriteria);

		List<Map<String,Object>> appstatics = restResponse.getAggBuckets("applicationsums",new ESTypeReference<List<Map<String,Object>>>(){});
		int doc_count_error_upper_bound = restResponse.getAggAttribute("applicationsums","doc_count_error_upper_bound",int.class);
		int sum_other_doc_count = restResponse.getAggAttribute("applicationsums","sum_other_doc_count",int.class);
		System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
		System.out.println("sum_other_doc_count:"+sum_other_doc_count);
		for(int i = 0; i < appstatics.size(); i ++){
			Map<String,Object> map = appstatics.get(i);
			//应用名称
			String appName = (String)map.get("key");
			//应用总访问量
			int totalsize =  (int)map.get("doc_count");
			//获取成功数和失败数
			List<Map<String,Object>> appstatic = ResultUtil.getAggBuckets(map ,"successsums",new ESTypeReference<List<Map<String,Object>>>(){});
			  doc_count_error_upper_bound = ResultUtil.getAggAttribute(map ,"successsums","doc_count_error_upper_bound",int.class);
			  sum_other_doc_count = ResultUtil.getAggAttribute(map ,"successsums","sum_other_doc_count",int.class);
			System.out.println("doc_count_error_upper_bound:"+doc_count_error_upper_bound);
			System.out.println("sum_other_doc_count:"+sum_other_doc_count);
			/**
			"buckets": [
			{
				"key": 0,
					"doc_count": 30
			}
                        ]
			 */
			//key 0
			int success = 0;//成功数
			int failed = 0;//失败数
			for(int j = 0; j < appstatic.size(); i ++){
				Map<String,Object> stats = appstatic.get(i);
				int key = (int) stats.get("key");//成功和错误标识
				if(key == 0)
                	success = (int)stats.get("doc_count");
				else if(key == 1)
					failed = (int)stats.get("doc_count");
			}

		}


	}


7.相关资料
高性能elasticsearch ORM开发库使用介绍

https://my.oschina.net/bboss/blog/1556866

bboss elasticsearch交流群:166471282
1
0
分享到:
评论

相关推荐

    Elasticsearch官方提供数据案例account.json

    Elasticsearch支持多种查询语法,包括简单查询、过滤器、聚合、脚本字段和自定义评分等。 **Kibana可视化** 配合Kibana,Elasticsearch的可视化工具,用户可以将这些数据转化为直观的图表和仪表板。Kibana提供数据...

    ElasticSearch 学习案例

    在这个学习案例中,我们将深入探讨 Elasticsearch 的结构化搜索、过滤以及聚合数据分析。 首先,让我们来看一下如何在 Elasticsearch 中插入和检索数据。这里展示的是使用 Bulk API 插入测试帖子数据。每个帖子包含...

    ElasticSearch和activiti案例

    **Elasticsearch与Activiti深度整合案例** 在现代企业信息化建设中,Elasticsearch和Activiti作为两个重要的技术组件,分别扮演着数据搜索与流程管理的重要角色。Elasticsearch是一款强大的分布式搜索引擎,以其...

    elasticsearch+spring小案例

    在本"elasticsearch+spring小案例"中,我们将探讨如何集成Elasticsearch 6.1.2版本与Spring框架,以便在Java应用中利用Elasticsearch的强大搜索功能。Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,...

    Elasticsearch实战与原理解析 源代码.zip

    例如,你可能会看到如何使用Java API或者curl命令来与Elasticsearch交互,创建索引并插入文档,或者执行复杂的聚合查询。 在实战部分,你可能会学习到如何处理实时数据流,比如日志分析或者实时监控。Elasticsearch...

    elasticsearch数据库使用案例.docx

    ### Elasticsearch在各领域中的应用案例 #### 一、日志和事件数据分析 **案例背景**: 随着业务规模的不断扩大和技术架构的复杂化,对于企业来说,有效地管理和分析大量的日志数据变得至关重要。Elasticsearch ...

    Elasticsearch技术解析与实战+Elasticsearch权威指南

    《Elasticsearch 权威指南(中文版)清晰PDF》则更深入地探讨了高级特性和最佳实践,包括搜索算法、集群管理、安全性以及复杂的查询和聚合操作。 总的来说,这两本书结合阅读,将使你对Elasticsearch有全面而深入的...

    elasticsearch数据库下载、配置、使用案例

    本资源是一个全面指南,涵盖了Elasticsearch数据库的下载、配置到高级使用案例,旨在帮助初学者和开发者快速掌握Elasticsearch的强大功能。 Elasticsearch,作为一款流行的开源搜索引擎,基于Lucenece开发,设计...

    Elasticsearch权威指南高清中文版PDF

    - **搜索与查询**:介绍查询DSL,学习如何编写复杂的搜索语句,包括匹配、范围、布尔、聚合等查询。 4. **高级特性** - **分布式搜索**:了解Elasticsearch如何在集群中分配和执行搜索请求,以及负载均衡机制。 ...

    Elasticsearch 案例

    Elasticsearch(简称ES)是一款基于Lucene的分布式、实时的全文搜索引擎,它不仅提供了搜索功能,还支持数据分析、实时聚合等高级特性。在这个案例中,我们将探讨如何利用Elasticsearch开发一个简易的搜索引擎,实现...

    elasticsearch数据库下载、配置、使用案例的简单分享.docx

    ### Elasticsearch 数据库下载、配置及使用案例详解 #### 一、Elasticsearch简介 Elasticsearch 是一个基于 Lucene 的高性能、分布式、多租户能力的全文搜索引擎。它提供了丰富的功能,包括HTTP web接口和无模式的...

    Elasticsearch 的分享

    ### Elasticsearch 分享知识点总结 #### 一、Elasticsearch 概览 **定义与特性:** Elasticsearch(简称 ES)是一款开源的、基于 Lucene 的搜索引擎,它提供了分布式的多用户能力的全文搜索引擎,基于 RESTful Web...

    elasticsearch.zip

    在本案例中,"elasticsearch.zip" 文件是与ThinkPHP (TP) 框架相关的,特别是TP5版本,它被设计为可以移植到较早的TP3版本上。这表明开发者希望在不同的TP版本中使用Elasticsearch的功能,以增强应用程序的数据搜索...

    elasticsearch 性能测试

    Elasticsearch 是一个分布式、全文检索的搜索引擎,广泛应用于数据分析、日志聚合和实时搜索场景。为了确保系统在高负载下仍能保持高效稳定,性能测试是至关重要的。本篇文章将围绕如何对 Elasticsearch 进行性能...

    elasticsearch6.1.0+elasticsearch head

    - **搜索功能**:Elasticsearch 提供了强大的全文搜索、过滤、排序和聚合功能,支持复杂查询语句和实时返回结果。 - **分布式特性**:Elasticsearch 采用分布式架构,支持自动故障恢复和数据平衡,保证了系统的高...

    Elasticsearch技术解析与实战_高清 带索引书签目录_朱林(著)

    Elasticsearch(简称ES)是一款基于Lucene的分布式、RESTful搜索和分析引擎,广泛应用于大数据处理、日志分析、实时搜索等领域。这本书针对不同层次的读者,提供了深入浅出的技术解析和实战指导。 **1. Elastic...

    Elasticsearch集成Hadoop最佳实践

    ES-Hadoop模块使得数据可以轻松地在Hadoop生态系统中传输和分析,并将结果存储在Elasticsearch中进行实时查询和搜索。 Elasticsearch Hadoop集成的最佳实践主要包括以下几个方面: 1. 数据导入与导出:ES-Hadoop...

    elasticsearch权威指南-中文

    7. **性能调优**:分享Elasticsearch性能优化技巧,如调整内存设置、优化索引设置、合理分配硬件资源等,提升系统运行效率。 8. **集群管理**:讲解如何搭建和管理Elasticsearch集群,包括跨节点通信、数据路由、...

    Elasticsearch初识与简单案例.pdf

    ### Elasticsearch 初识与简单案例 #### 一、Elasticsearch简介 Elasticsearch 是一款基于 Lucene 的分布式全文搜索引擎,具有高度可扩展性及灵活性。它不仅支持文本搜索,还能进行复杂的数据分析任务,因此在众多...

    elasticsearch数据库下载、配置、使用案例&项目源码

    ### Elasticsearch 数据库下载、配置、使用案例及项目源码详解 #### 一、Elasticsearch 数据库下载 Elasticsearch 是一款广泛使用的分布式搜索引擎和分析引擎,它基于 Lucene 构建并支持实时分析。Elasticsearch ...

Global site tag (gtag.js) - Google Analytics