`
qindongliang1922
  • 浏览: 2187961 次
  • 性别: Icon_minigender_1
  • 来自: 北京
博客专栏
7265517b-f87e-3137-b62c-5c6e30e26109
证道Lucene4
浏览量:117630
097be4a0-491e-39c0-89ff-3456fadf8262
证道Hadoop
浏览量:126036
41c37529-f6d8-32e4-8563-3b42b2712a50
证道shell编程
浏览量:59985
43832365-bc15-3f5d-b3cd-c9161722a70c
ELK修真
浏览量:71374
社区版块
存档分类
最新评论

ElasticSearch之Java Api聚合分组实战

    博客分类:
  • ELK
阅读更多

最近有个日志收集监控的项目采用的技术栈是ELK+JAVA+Spring,客户端语言使用的是Java,以后有机会的话可以试一下JavaScript+Nodejs的方式,非常轻量级的组合,只不过不太适合服务化的工程,Kibana充当可视化层,功能虽然非常强大和灵活,但是需要业务人员懂Lucene的查询语法和Kibana的Dashboard仪表盘自定义功能才能玩的转,所以Kibana面向专业的开发人员和运维人员比较良好,但面向业务人员则稍微有点难度,我们这边就使用Java进行二次开发,然后前端定义几个业务人员关注的图表,然后把后端查询的数据,按照一定的维度放进去即可。

基础环境:
(1)ElasticSearch1.7.2
(2)Logstash2.2.2
(3)Kibana4.1.2
(3)JDK7
(4)Spring4.2


使用到的技术点:
(1)ElasticSearch的查询
(2)ElasticSearch的过滤
(3)ElasticSearch的日期聚合
(4)ElasticSearch的Terms聚合
(5)ElasticSearch的多级分组
(6)ElasticSearch+Logstash的时区问题

直接上代码:

package cn.bizbook.product.elk.dao.impl;

import cn.bizbook.product.elk.config.ESConf;
import cn.bizbook.product.elk.dao.ESDao;
import cn.bizbook.product.elk.utils.TimeTools;
import cn.bizbook.product.elk.vo.count.Condition;
import cn.bizbook.product.elk.vo.count.CountType;
import cn.bizbook.product.elk.vo.count.search.GroupCount;
import cn.bizbook.product.elk.vo.count.search.MonitorCount;
import org.apache.commons.lang.StringUtils;
import org.apache.lucene.queryparser.classic.QueryParserBase;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Repository;

import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by qindongliang on 2016/4/6.
 */
@Repository("esDaoImpl")
public class ESDaoImpl implements ESDao {

    private static Logger log= LoggerFactory.getLogger(ESDaoImpl.class);
    @Autowired
    private ESConf esConf;

    @Resource(name = "client")
    private  Client client;


    @Override
    public MonitorCount count() {
        MonitorCount count=new MonitorCount();
        //今天的数量
        count.setToday_count(customCount(false,"*:*"));
        //今天的入库量
        count.setToday_store_count(customCount(false,"-save:1"));
        //所有的总量
        count.setTotal_count(customCount(true,"*:*"));
        //所有的入库总量
        count.setTotal_store_count(customCount(true,"-save:1"));
        return count;
    }

    private long customCount(boolean isQueryAll, String queryString){
        try {
            //今天的开始时间 比如2016-04-01 00:00:00
            long today_start = TimeTools.getDayTimeStamp(0);
            //今天的结束时间 也就是明天的开始时间 比如2016-04-02 00:00:00
            //一闭区间一开区间即得到一天的统计量
            long today_end=TimeTools.getDayTimeStamp(1);
            StringBuffer fq = new StringBuffer();
                     fq.append("@timestamp:")
                    .append(" [ ")
                    .append(today_start)
                    .append(" TO  ")
                    .append(today_end)
                    .append(" } ");
            //构建查询请求,使用Lucene高级查询语法
            QueryBuilder query=QueryBuilders.queryStringQuery(queryString);
            //构建查询请求
            SearchRequestBuilder search = client.prepareSearch("crawl*").setTypes("logs");
            //非所有的情况下,设置日期过滤
            if(isQueryAll){
                search.setQuery(query);//查询所有
            }else {//加上日期过滤
                search.setQuery(QueryBuilders.filteredQuery(query, FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString()))));
            }
            SearchResponse r = search.get();//得到查询结果
            long hits = r.getHits().getTotalHits();//读取命中数量
            return hits;
        }catch (Exception e){
            log.error("统计日期数量出错!",e);
        }
        return 0;
    }


    @Override
    public List<GroupCount> query(Condition condition) {
        return grouyQuery(condition);
    }

    /***
     * @param c 查询的条件
     * @return 查询的结果
     */
    private List<GroupCount> grouyQuery(Condition c){
        //封装结果集
        List<GroupCount> datas=new ArrayList<>();
        //组装分组
        DateHistogramBuilder dateAgg = AggregationBuilders.dateHistogram("dateagg");
        //定义分组的日期字段
        dateAgg.field("@timestamp");
        //按天分组
        if(CountType.EACH_DAY==(c.getType())) {
            dateAgg.interval(DateHistogram.Interval.DAY);
            dateAgg.timeZone("+8:00");
            dateAgg.format("yyyy-MM-dd");
        //按小时分组
        }else if(CountType.EACH_HOUR==c.getType()){
            dateAgg.interval(DateHistogram.Interval.HOUR);
            //按小时分组,必须使用这个方法,不然得到的结果不正确
            dateAgg.postZone("+8:00");
            dateAgg.format("yyyy-MM-dd HH");
        //无效分组
        }else{
            throw new NullPointerException("无效的枚举类型");
        }
        //二级分组,统计入库的成功失败量 0 1 2 , 1为不成功
        dateAgg.subAggregation(AggregationBuilders.terms("success").field("save"));

        //查询过滤条件
        StringBuffer fq = new StringBuffer();
        //过滤时间字段
        fq.append(" +@timestamp:")
                .append(" [ ")
                .append(c.getStart().getTime())
                .append(" TO  ")
                .append(c.getEnd().getTime())
                .append(" } ");
        //过滤一级
        if(StringUtils.isNotEmpty(c.getT1())){
            fq.append(" +t1:").append(c.getT1());
        }
        //过滤二级
        if(StringUtils.isNotEmpty(c.getT2())){
            fq.append(" +t2:").append(c.getT2());
        }
        //过滤三级
        if(StringUtils.isNotEmpty(c.getT3())){
            fq.append(" +t3:").append(c.getT3());
        }
        //过滤url
        if(StringUtils.isNotEmpty(c.getSourceUrl())){
            //对url进行转义,防止查询出现错误
            fq.append(" +url:").append(QueryParserBase.escape(c.getSourceUrl()));
        }
        //过滤省份编码
        if(StringUtils.isNotEmpty(c.getProvinceCode())){
            fq.append(" +pcode:").append(c.getProvinceCode());
        }
        //过滤入库状态
        if(c.getSavaState()!=null){
            fq.append(" +save:").append(c.getSavaState().getCode());
        }
        //过滤http状态码
        if(c.getWebsiteState()!=null){
            if(!c.getWebsiteState().getCode().equals("-1")) {
                fq.append(" +httpcode:").append(c.getWebsiteState().getCode());
            }else{
                fq.append(" -httpcode:").append("(0 110 200)");
            }
        }
        //过滤配置configid
        if(StringUtils.isNotEmpty(c.getConfigId())){
            fq.append(" +cid:").append(c.getConfigId());
        }



        //查询索引
        SearchRequestBuilder search=client.prepareSearch("crawl*").setTypes("logs");
        //组装请求
        search.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(),
                FilterBuilders.queryFilter(QueryBuilders.queryStringQuery(fq.toString())
                        .defaultOperator(QueryStringQueryBuilder.Operator.AND)
                ))).addAggregation(dateAgg);
        //获取查询结果
        SearchResponse r = search.get();//得到查询结果
        //获取一级聚合数据
        Histogram h=r.getAggregations().get("dateagg");
        //得到一级聚合结果里面的分桶集合
        List<DateHistogram.Bucket> buckets = (List<DateHistogram.Bucket>) h.getBuckets();
        //遍历分桶集
        for(DateHistogram.Bucket b:buckets){
            //读取二级聚合数据集引用
            Aggregations sub = b.getAggregations();
            //获取二级聚合集合
            StringTerms count = sub.get("success");
            GroupCount groupCount=new GroupCount();
            //设置x轴分组日期
            groupCount.setGroupKey(b.getKey());
            //设置指定分组条件下入库总量
            groupCount.setTotal_count(b.getDocCount());
            //读取指定分组条件下不成功的数量
            long bad_count=count.getBucketByKey("1")==null?0:count.getBucketByKey("1").getDocCount();
            //设置指定分组条件下成功的入库量
            groupCount.setTotal_store_count(b.getDocCount()-bad_count);
            //计算成功率
            groupCount.setSuccess_rate(groupCount.getTotal_store_count()*1.0/groupCount.getTotal_count());
            //添加到集合里面
            datas.add(groupCount);
        }
        return datas;
    }



}






总结:
(1)关于时区的问题,目前发现在测试按小时,按天分组统计的时候,时区使用的方法不是一致的,而postZone这个方法,在1.5版本已经废弃,说是使用timeZone替代,但经测试发现在按小时分组的时候,使用timeZone加8个时区的并没生效,后续看下最新版本的ElasticSearch是否修复。
(2)使用Terms的聚合分组时,这个字段最好是没有分过词的,否则大量的元数据返回,有可能会发生OOM的异常
(3)在不需要评分排名查询的场景中,尽量使用filter查询,elasticsearch会缓存查询结果,从而能大幅提高检索性能

今天先总结这么多,后续有空再关注下
(1)elasticsearch中的Aggregations和Facet的区别以及对比Solr中的Group和Facet的区别
(2)在不同的聚合渠道中多级分组中是组内有序还是全局有序



有什么问题 可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。
本公众号的内容是有关搜索和大数据技术和互联网等方面内容的分享,也是一个温馨的技术互动交流的小家园



0
6
分享到:
评论

相关推荐

    ElasticSearch Java API

    **Elasticsearch Java API**是Elasticsearch与Java应用程序交互的主要工具,它允许开发者在Java环境中无缝地创建、管理和查询Elasticsearch索引。Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,用于...

    项目实战——Spark将Hive表的数据写入ElasticSearch(Java版本)

    在本项目实战中,我们将探讨如何使用Java编程语言,结合Spark和Hive,将Hive中的数据高效地导入到ElasticSearch(ES)中,并利用ES的别名机制实现数据更新的平滑过渡。以下是对这个流程的详细解析: 1. **Hive数据...

    elasticsearch-java.rar

    Elasticsearch提供了Java API,使得Java开发者可以直接在代码中操作Elasticsearch集群,执行索引、查询、更新和删除等操作。这个API是与Elasticsearch服务器通信的主要方式,简化了开发过程。 3. **安装与配置** ...

    Elasticsearch 技术解析与实战.zip

    6 1.3.2 JSON介绍 10 1.4 安装配置 12 1.4.1 安装Java 12 1.4.2 安装Elasticsearch 12 1.4.3 配置 13 1.4.4 运行 15 1.4.5 停止 17 1.4.6 作为服务 17 1.4.7 版本升级 19 1.5 对外接口 21 1.5.1 API约定 22 1.5 .2 ...

    Elasticsearch实战与原理解析 源代码.zip

    例如,你可能会看到如何使用Java API或者curl命令来与Elasticsearch交互,创建索引并插入文档,或者执行复杂的聚合查询。 在实战部分,你可能会学习到如何处理实时数据流,比如日志分析或者实时监控。Elasticsearch...

    elasticsearch实战源码 (黄申译)

    Elasticsearch是一款开源的、分布式的全文搜索引擎,由Java编写,设计用于处理海量数据的快速检索。黄申翻译的《Elasticsearch实战》一书,深入浅出地介绍了Elasticsearch的核心概念、功能以及实际应用。书中附带的...

    elasticsearch-test.zip

    《Elasticsearch实战:基于Java的深度探索》 在当今大数据时代,Elasticsearch作为一款强大的开源全文搜索引擎,因其高效、灵活和可扩展性而备受青睐。本篇将围绕"elasticsearch-test.zip"这个压缩包文件,深入探讨...

    Java_Elasticsearch在动作书.zip

    《Java Elasticsearch实战》这本书主要探讨了如何利用Java API与Elasticsearch进行深度集成,从而实现高效的数据检索和分析。Elasticsearch是一个分布式、RESTful风格的搜索和数据分析引擎,广泛应用于日志分析、...

    Java 之 ElasticSearch7.x.x 爬虫 + 项目实战-搜索页面

    Java API是与Elasticsearch交互的主要方式之一,可以方便地创建索引、插入文档、执行查询等操作。 其次,**SpringBoot** 是Spring框架的一个简化版,它旨在简化创建独立的、生产级别的基于Spring的应用程序。通过...

    龙果学院 elasticsearch 72讲笔记

    s这门技术有点特殊,跟比如其他的像纯java的课程,比如分布式课程,或者大数据...《核心知识篇(下半季)》,包括深度讲解搜索这块技术,还有聚合分析这块技术,包括数据建模,包括java api的复杂使用,有一个项目实战s

    Lucene全文检索入门项目 Java实现Maven项目 Elasticsearch 基础实战.zip

    【标题】"Lucene全文检索入门项目 Java实现Maven项目 Elasticsearch 基础实战" 提供了一个学习如何在Java环境中运用全文检索技术的起点。这个项目涵盖了两个主要的开源工具:Lucene和Elasticsearch,它们都是业界...

    elasticsearch 6.8.0 整理.zip

    **Elasticsearch 6.8.0:核心概念与实战指南** Elasticsearch 是一个流行的开源全文搜索引擎,基于 Lucene 库,广泛用于实时数据分析、日志聚合、搜索和信息检索。版本 6.8.0 提供了稳定性和性能的优化,使其成为...

    这篇实战攻略,带你轻松入门Elastic search.pdf

    再者,Elasticsearch提供了强大的聚合分析功能,能对存储的大量数据进行近实时的统计分析。 要使用Elasticsearch,有几个最基本的概念是必须了解的,包括节点(Node)、索引(Index)、类型映射(Mapping)和文档...

    2018年最新ElasticSearch6实战教程

    1. **基础知识准备**:在学习之前,最好具备一定的Java基础,因为ElasticSearch是基于Java编写的。 2. **实践操作**:理论学习的同时,要动手实践,通过搭建自己的测试环境来加深理解。 3. **社区交流**:加入...

    elasticsearch实战及使用ppt,私有资源自己 看的

    - **Elasticsearch vs Lucene**: ES在Java之上构建,简化了Lucene的使用,支持集群,解决了Lucene的一些复杂性和限制。 - **Elasticsearch vs Solr**: ES自带分布式协调,支持实时搜索,而Solr需借助Zookeeper进行...

    elasticsearch技术解析与实战

    **Elasticsearch技术解析与实战** Elasticsearch是一款开源、分布式、实时的全文搜索引擎,它以其强大的搜索功能和高可扩展性在IT行业中备受青睐。本书"elasticsearch技术解析与实战"深入浅出地介绍了Elasticsearch...

    Elasticsearch核心技术与实战 笔记

    ### Elasticsearch核心技术与实战笔记知识点概览 #### 一、ELK Stack概述 - **定义**:ELK Stack是由Elasticsearch、Logstash和Kibana组成的开源软件集合,旨在提供全面的数据抽取、搜索分析和数据可视化解决方案。...

    狂神说ElasticsearchDemo

    【qiao-es-api】这部分可能是Elasticsearch客户端API的实现,用于和Elasticsearch服务器交互。常见的客户端API包括Java REST Client、Transport Client(已被弃用)和Jest等。此部分会涵盖以下知识点: 1. **连接...

    java的Elasticsearch02学习资料

    Elasticsearch提供了Java API,使得开发者可以方便地在Java应用程序中集成Elasticsearch功能。主要包括以下操作: - **创建索引**:使用`client.admin().indices().prepareCreate(indexName)`来创建一个新的索引。 ...

Global site tag (gtag.js) - Google Analytics