1.等价的sql
SELECT DISTINCT field1,field2 FROM test_index.test_type
等价于
SELECT field1,field2 FROM test_index.test_type GROUP BY field1,field2
2.而group by的查询,在es中我们可以用Aggregation(聚合)去实现,等价的DSL查询语句如下:
POST /test_index/test_type/_search
{
"from": 0,
"size": 0,
"aggregations": {
"field1": {
"terms": {
"field": "field1",
"size": 2147483647
},
"aggregations": {
"field2": {
"terms": {
"field": "field2",
"size": 2147483647
}
}
}
}
}
}
3.java的实现:
import com.google.common.collect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Stack;
import java.util.stream.Collectors;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
/**
*
* @author zhongchenghui
*/
public class EsSearch {
private static final String CLUSTER_NAME = "test_cluster";
private static final String ES_ADDRESSES = "192.168.12.1,192.168.12.2,192.168.12.3";
private static final String INDEX_NAME = "test_index";
private static final Client ES_CLIENT = ESClientFactory.newInstance(CLUSTER_NAME, ES_ADDRESSES);
/**
* 根据多个字段group by的查询
*
* @param type
* @param groupColumnsNames
* @return
* @throws Exception
*/
public List<Map<String, String>> getRowsByGroup(String type, String... groupColumnsNames) throws Exception {
TermsBuilder allTermsBuilder = createTermsAggregationBuilder(groupColumnsNames);
SearchResponse response = createSearchRequestBuilder(type, groupColumnsNames).setSize(0).addAggregation(allTermsBuilder).execute().actionGet();
List<Map<String, String>> rows = new ArrayList<>();
Terms agg = response.getAggregations().get(groupColumnsNames[0]);
int i = 0;
agg.getBuckets().stream().forEach((bucket) -> {
rows.addAll(getFlatBucket(i, bucket, groupColumnsNames));
});
return rows;
}
/**
* 逐层创建AggregationBuilder
* (此处Integer.MAX_VALUE是逐层分组的最大组数)
* @param groupColumnsNames
* @return
* @throws Exception
*/
private TermsBuilder createTermsAggregationBuilder(String... groupColumnsNames) {
TermsBuilder allTermsBuilder = AggregationBuilders.terms(groupColumnsNames[0]).field(groupColumnsNames[0]).size(Integer.MAX_VALUE);
TermsBuilder tmpTermsBuilder = allTermsBuilder;
for (int i = 1; i < groupColumnsNames.length; i++) {
TermsBuilder termsBuilder = AggregationBuilders.terms(groupColumnsNames[i]).field(groupColumnsNames[i]).size(Integer.MAX_VALUE);
tmpTermsBuilder.subAggregation(termsBuilder);
tmpTermsBuilder = termsBuilder;
}
return allTermsBuilder;
}
/**
* 创建查询请求的Builder
*
* @param type
* @param groupColumnsNames
* @return
* @throws Exception
*/
private SearchRequestBuilder createSearchRequestBuilder(String type, String... columnNames) {
SearchRequestBuilder searchRequestBuilder = ES_CLIENT.prepareSearch(INDEX_NAME).setTypes(type).setSize(50000);
if (Objects.nonNull(columnNames) && columnNames.length > 0) {
searchRequestBuilder.addFields(columnNames);
}
return searchRequestBuilder;
}
/**
* 用堆栈将respone中的聚合结果拉平返回(广度优先遍历)
*
* @param layer
* @param bucket
* @param groupColumnsNames
* @return
*/
private List<Map<String, String>> getFlatBucket(int layer, Terms.Bucket bucket, String... groupColumnsNames) {
ArrayListMultimap<BucketNode, BucketNode> bucketRowMultimap = ArrayListMultimap.create();
Stack<BucketNode> nodeStack = new Stack<>();
BucketNode bucketNode = new BucketNode(layer, groupColumnsNames[layer], bucket);
nodeStack.add(bucketNode);
bucketRowMultimap.put(bucketNode, bucketNode);
while (!nodeStack.isEmpty()) {
bucketNode = nodeStack.pop();
List<BucketNode> childerNodes = getChildrenBucketNodes(bucketNode, groupColumnsNames);
if (childerNodes != null && !childerNodes.isEmpty()) {
List<BucketNode> parentRoute = bucketRowMultimap.removeAll(bucketNode);
for (BucketNode child : childerNodes) {
nodeStack.push(child);
bucketRowMultimap.putAll(child, parentRoute);
bucketRowMultimap.put(child, child);
}
}
}
return convertToRows(bucketRowMultimap.asMap().values());
}
/**
* 获得下一层Bucket的节点列表
*
* @param parentNode
* @param groupColumnsNames
* @return
*/
private List<BucketNode> getChildrenBucketNodes(BucketNode parentNode, String... groupColumnsNames) {
int currentLayer = parentNode.layer + 1;
if (currentLayer < groupColumnsNames.length) {
String currentAggName = groupColumnsNames[currentLayer];
Terms currentAgg = parentNode.bucket.getAggregations().get(currentAggName);
if (Objects.nonNull(currentAgg)) {
return currentAgg.getBuckets().stream().map(bucket -> new BucketNode(currentLayer, currentAggName, bucket)).collect(Collectors.toList());
}
}
return null;
}
private List<Map<String, String>> convertToRows(Collection<Collection<BucketNode>> bucketRoutes) {
return bucketRoutes.stream().map(bucketRoute -> convertToRow(bucketRoute)).collect(Collectors.toList());
}
private Map<String, String> convertToRow(Collection<BucketNode> bucketRoute) {
Map<String, String> row = new HashMap<>();
bucketRoute.stream().forEach(bucketNode -> row.put(bucketNode.aggName, bucketNode.bucket.getKeyAsString()));
return row;
}
class BucketNode {
int layer;
String aggName;
Terms.Bucket bucket;
public BucketNode(int layer, String aggName, Terms.Bucket bucket) {
BucketNode.this.layer = layer;
BucketNode.this.aggName = aggName;
BucketNode.this.bucket = bucket;
}
@Override
public String toString() {
return "BucketNode{" + "layer=" + layer + ", aggName=" + aggName + ", bucket_key=" + bucket.getKeyAsString() + '}';
}
}
}
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* @author zhongchenghui
*/
public class ESClientFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(ESClientFactory.class);
private static final ConcurrentHashMap<String, Client> CLIENT_CACHE = new ConcurrentHashMap<>();
public static Client newInstance(String clusterName, String hostStr) {
Client client = CLIENT_CACHE.get(clusterName);
if (client == null) {
Map<String, Integer> addressMap = ESClientFactory.getESAddressMap(hostStr);
Settings settings = Settings.settingsBuilder().put("cluster.name", clusterName).build();
TransportClient newClient = TransportClient.builder().settings(settings).build();
addressMap.keySet().forEach((host) -> {
try {
newClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(host), addressMap.get(host)));
} catch (UnknownHostException ex) {
LOGGER.error("Init ES client failure,cluster name is:{},Error:{}", clusterName, ex);
}
});
client = newClient;
CLIENT_CACHE.put(clusterName, newClient);
}
return client;
}
private static Map<String, Integer> getESAddressMap(String hostStr) {
Map<String, Integer> hostMap = new HashMap<>();
String[] hosts = hostStr.split(",");
for (String host : hosts) {
String[] hostPort = host.trim().split(":");
Integer port = hostPort.length < 2 ? 9300 : Integer.valueOf(hostPort[1]);
hostMap.put(hostPort[0], port);
}
return hostMap;
}
}
4.存在的问题:
a.因为实现的方式是一层层往下聚合,当es中的document出现field1的字段为null的时候,该条件就不会往下聚合,即使该document的field2字段有值。(可用指定字符代替null来解决这个问题)
b.不适合于数据量太大表,3中的代码要求最大每个字段的分组数不能大于Integer.MAX_VALUE
c.返回的字段只能是参与group by的字段
分享到:
相关推荐
BoolQuery是Elasticsearch中的一种查询方式,使用must、should、must_not三个关键词来实现查询。must表示文档必须完全匹配条件,should表示文档至少满足一个条件,must_not表示文档必须不匹配条件。 在查询时,...
Elasticsearch查询客户端是用于与ES服务器通信的软件,它们提供了多种语言的API,允许开发者以编程方式执行索引、搜索、更新和删除等操作。常见的Elasticsearch客户端包括: - **Jest**:一个轻量级的Java REST...
方法如果传总页数了,es就不用查询总页数,直接通过开始位置到结束位置取数即可
在 Java 中,我们可以使用 Elasticsearch 的 Java 客户端来实现分组聚合查询。 Elasticsearch 的聚合查询 Elasticsearch 的聚合查询可以对数据进行分组、聚合、排序等操作。聚合查询可以根据不同的字段进行分组,...
在本教程中,我们将深入探讨Elasticsearch的简单查询和聚合查询,帮助你更好地理解和应用这一强大的工具。 一、Elasticsearch的简单查询 简单查询是ES中最基础的搜索方式,主要包括以下几种类型: 1. **全文本...
本主题聚焦于“Elasticsearch Java代码实现”,将深入探讨如何使用Java API来执行基本的操作,如创建索引、删除索引、更新索引、模糊搜索以及模糊全文搜索和精确查找。 首先,让我们从`ESManager.java`开始,这个类...
`es-sql.txt` 文件可能包含了使用 Elasticsearch SQL 查询语言编写的类似查询。Elasticsearch 从 6.0 版本开始支持 SQL 查询语法,使得对 ES 的查询更加直观,尤其是对于熟悉 SQL 的用户来说。在 SQL 查询中,我们...
本文将详细讲解如何使用 Elasticsearch 的脚本聚合来实现这一功能,并结合提供的文件 `EsPiPelineAggFilterForAggValueBucketTest.java` 和 `es-sql.txt` 来说明其具体应用。 首先,我们要理解Elasticsearch的脚本...
在本项目中,“对Elasticsearch-PHP进行查询语句封装 可实现链式调用 方便 es查询”,主要涉及到的知识点有: 1. **Elasticsearch查询语句**:Elasticsearch支持丰富的查询语法,如match查询、term查询、range查询...
ElasticSearch是一款功能强大且灵活的搜索引擎,它提供了丰富的Restful API来实现对ES的增删改查操作,以及复杂的聚合查询功能。在实际项目中,使用ElasticSearch可以轻松地实现数据的存储、检索和分析。 新增 在...
elasticsearch的客户端比较出名的就是elasticsearch head 和Kibana了, ... 而Kibana虽功能全面,但是启动麻烦,大部分功能用不上,很不灵活,所以采用vite2+vue3+ts+arco-design进行开发了一个elasticsearch的客户端。
ES 聚合查询结果转换成相应的对象集合,ES 聚合查询结果转换成相应的对象集合
springboot整合elasticsearch7,进行数据同步。elasticsearch相关度查询、排序。高亮显示;自动补全等功能。代码仅供参考,代码中有具体的注释,可以根据代码及注释内容,对自己项目架构及业务进行修改、整合。
elasticsearch聚合
Java实现Elasticsearch的简单实例主要涉及以下几个关键知识点: 1. **Elasticsearch基础**:Elasticsearch(ES)是一个开源的、分布式全文搜索引擎,它提供了实时数据分析的能力,广泛用于日志分析、监控、搜索应用...
本实例为博主原创,属于简单易上手并且能够拿来就用的SpringBoot ES 项目,全文使用的是ElasticsearchTemplate进行开发。 本实例涵盖ES中的各类操作,如索引操作、CRUD操作、批处理、结果排序、分页查询、检索查询、...
4.(后端技术篇java)ElasticSearch实现反向地址匹配服务(点周边查询服务) 5.(后端技术篇java)ElasticSearch实现矩形空间查询服务 6.(后端技术篇java)ElasticSearch实现圆形空间查询服务 7.(后端技术篇java)...
searchEngine 是基于 ElasticSearch 和 Java 实现的搜索引擎系统,实现关键字高亮搜索、添加文本等功能。 该项目集成了 Spring Boot、ElasticSearch、RestHighLevelClient、Vue.js、Element-ui、Log4j 和 Fastjson ...
聚合查询分页测试termsAgg.size(2147483647); //指定最大统计显示多少行步骤1:全量聚合,size设置为: 2147483647。 ES5.X/6.X版本设置为2147483647 ,它等于2^31-1,请看该地方代码
在Elasticsearch中,Terms聚合可以使用以下方式定义: ``` { "aggs" : { "genders" : { "terms" : { "field" : "gender" } } } } ``` 在上面的示例中,我们使用Terms聚合对gender字段进行分组统计。Elastic...