编写不易,转载请注明( http://shihlei.iteye.com/blog/2339398)!
一 前言
ES 做简单的条件查询,条件删除,在2.4版没有提供,script只提供的update的方案,自己简单封装了下。做面向对象的使用。
二 依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>2.4.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.20</version> </dependency>
三 程序
连接客户端:
public class EsTcpClient { private static final String ES_HOST = "localhost"; private static final int ES_TCP_PORT = 9300; private static TransportClient client; /** * 获取TCP 客户端 * * @return */ public static synchronized TransportClient getClient() { if (client == null) { build(); } return client; } /** * 关闭客户端 */ public static void close(TransportClient client) { if (client != null) { client.close(); } } /** * 建立连接 * * @return */ private static void build() { try { //特别注意:如果cluster 起了名字,需要在连接时指定名字,否则验证客户端连接的不是默认集群elasticsearch,会忽略,则无法找到节点 Settings settings = Settings.settingsBuilder() .put("cluster.name", "mycluster").build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT)); // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300)); } catch (UnknownHostException e) { throw new RuntimeException(e); } } }
Template
public class EsTemplate<T extends EsTemplate.EsBean> { //TCP连接客户端: private TransportClient client = null; public EsTemplate(TransportClient client) { this.client = client; } /** * 新增 * * @param doc * @return */ public boolean insert(T doc) { String json = JSON.toJSONString(doc); IndexResponse response = client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json).get(); return response.isCreated(); } /** * 替换 * * @param doc * @return */ public boolean replace(T doc) { return update(doc); } /** * 更新 * * @param doc * @return */ public boolean update(T doc) { String json = JSON.toJSONString(doc); UpdateResponse response = client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId()) .setDoc(json) .get(); return !response.isCreated(); } /** * 删除 * * @param doc * @return */ public boolean delete(T doc) { DeleteResponse response = client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId()).get(); return response.isFound(); } /** * 查询 * * @param indics 索引名 * @param type 类型 * @param id id * @param docClass 检索类型 * @return */ public T searchById(String indics, String type, String id, Class<T> docClass) { GetResponse response = client.prepareGet(indics, type, id).get(); if (response.isExists()) { String json = response.getSourceAsString(); return JSON.parseObject(json, docClass); } return null; } /** * 条件查询 * * @param criterias * @return */ public List<T> search(String indics, String type, Collection<Criteria> criterias, Class<T> docClass) { Function<SearchHit, T> resultConverter = searchHit -> { if (searchHit.isSourceEmpty()) { return null; } String json = searchHit.getSourceAsString(); return JSON.parseObject(json, docClass); }; return executeSearch(indics, type, criterias, true, resultConverter); } //批量部分------------------------------------------------------------------------------ /** * 批量插入更新 * * @param docs * @return */ public int batchInsert(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量插入 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json)); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 批量更新 * * @param docs * @return */ public int batchUpdate(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId()) .setDoc(json)); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 批量删除 * * @param docs * @return */ public int batchDelete(Collection<T> docs) { if (docs == null || docs.isEmpty()) { return 0; } //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (T doc : docs) { String json = JSON.toJSONString(doc); bulkRequestBuilder.add(client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId())); } }; executeBulk(buildRequestFunction); return docs.size(); } /** * 条件更新 * * @param doc 文档 * @param criterias 查询条件 * @return 更新数量 */ public int updateByQuery(T doc, Collection<Criteria> criterias) { client.admin().indices().prepareRefresh().execute().actionGet(); final List<String> documentIds = executeSearch(doc.esIndics(), doc.esType(), criterias, false, searchHit -> searchHit.getId()); if (documentIds.isEmpty()) { return 0; } String json = JSON.toJSONString(doc); //构建批量更新 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (String id : documentIds) { UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(doc.esIndics(), doc.esType(), id) .setDoc(json); bulkRequestBuilder.add(updateRequestBuilder); } }; executeBulk(buildRequestFunction); return documentIds.size(); } /** * 条件删除 * * @param criterias * @return */ public int deleteByQuery(String indics, String type, Collection<Criteria> criterias) { client.admin().indices().prepareRefresh().execute().actionGet(); final List<String> documentIds = executeSearch(indics, type, criterias, false, searchHit -> searchHit.getId()); if (documentIds.isEmpty()) { return 0; } //构建批量删除 Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> { for (String id : documentIds) { bulkRequestBuilder.add(client.prepareDelete(indics, type, id)); } }; executeBulk(buildRequestFunction); return documentIds.size(); } /** * 执行批量操作 * * @param buildRequestFunction 构建处理 */ private void executeBulk(Consumer<BulkRequestBuilder> buildRequestFunction) { BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); buildRequestFunction.accept(bulkRequestBuilder); BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); if (bulkResponse.hasFailures()) { throw new RuntimeException(bulkResponse.buildFailureMessage()); } } //执行查询 private <E> List<E> executeSearch(String indics, String type, Collection<Criteria> criterias, boolean needSource, Function<SearchHit, E> handleHitfunction) { List<E> results = new LinkedList<>(); //指定查询的库表 SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indics); searchRequestBuilder.setTypes(type); searchRequestBuilder.setSize(9999); if (!needSource) { searchRequestBuilder.addFields(); } if (criterias != null && !criterias.isEmpty()) { //构建查询条件必须嵌入filter中! BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); for (Criteria c : criterias) { boolQueryBuilder.filter(QueryBuilders.termQuery(c.getFieldName(), c.getFieldValue())); } searchRequestBuilder.setQuery(boolQueryBuilder); } //请求 SearchResponse searchResponse = searchRequestBuilder.get(); //响应 SearchHits hits = searchResponse.getHits(); //无查询结果 if (hits.totalHits() > 0) { SearchHit[] hitList = hits.getHits(); for (SearchHit searchHit : hitList) { E e = handleHitfunction.apply(searchHit); if (e != null) { results.add(e); } } } return results; } /** * 查询条件 */ public static class Criteria { private String fieldName; private Object fieldValue; public Criteria(String fieldName, Object fieldValue) { this.fieldName = fieldName; this.fieldValue = fieldValue; } public String getFieldName() { return fieldName; } public void setFieldName(String fieldName) { this.fieldName = fieldName; } public Object getFieldValue() { return fieldValue; } public void setFieldValue(Object fieldValue) { this.fieldValue = fieldValue; } } /** * ES操作Bean */ public static abstract class EsBean { @JSONField(serialize = false, deserialize = false) protected Collection<Criteria> criterias = new LinkedList<>(); public Collection<Criteria> getCriterias() { return criterias; } /** * 获取索引库名 * * @return 索引库名 */ public abstract String esIndics(); /** * 获取类型 * * @return 索引类型名 */ public abstract String esType(); /** * 获取doc id * * @return 文档id */ public abstract String esId(); /** * 添加操作条件 * * @param criteria 条件 */ public void addCriteria(Criteria criteria) { criterias.add(criteria); } } }
四 使用
public class EsTset { public static void main(String[] args) { TransportClient client = EsTcpClient.getClient(); EsTemplate<TestEsBean> testEsBeanEsTemplate = new EsTemplate<>(client); //插入 Collection<TestEsBean> testEsBeens = new LinkedList<>(); for(int i=0;i<10;i++){ testEsBeens.add(new TestEsBean(String.valueOf(i),"name"+i,i)); } testEsBeanEsTemplate.batchInsert(testEsBeens); //查询 Collection<TestEsBean> dbTestEsBeans = testEsBeanEsTemplate.search("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",10)),TestEsBean.class); for(TestEsBean bean: dbTestEsBeans){ System.out.println(bean); } //删除 testEsBeanEsTemplate.deleteByQuery("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",1))); } public static class TestEsBean extends EsTemplate.EsBean { private String id; private String name; private int age; public TestEsBean(String id, String name, int age) { this.id = id; this.name = name; this.age = age; } @Override public String toString() { return "TestEsBean{" + "id='" + id + '\'' + ", name='" + name + '\'' + ", age=" + age + '}'; } /** * 获取索引库名 * * @return 索引库名 */ @Override public String esIndics() { return "testDb"; } /** * 获取类型 * * @return 索引类型名 */ @Override public String esType() { return "testBean"; } /** * 获取doc id * * @return 文档id */ @Override public String esId() { return id; } } }
相关推荐
**Elasticsearch 2.4 与 IK 分词器** Elasticsearch 是一款高度可扩展的开源全文搜索引擎,它提供了一种分布式、RESTful 风格的搜索和数据分析引擎,能够快速处理大量数据并进行复杂的搜索操作。版本 2.4 是 ...
本项目将详细讲解如何利用SpringBoot整合Kafka和Elasticsearch,实现日志的批量拉取和更新。 首先,我们需要在SpringBoot项目中引入相应的依赖。对于Kafka,我们需要添加`spring-kafka`依赖,它提供了与Kafka交互的...
Java做客户端对Elasticsearch服务的增删改查及批量修改操作,代码简洁易懂,思路清晰有注释.详情参考https://blog.csdn.net/linhaiyun_ytdx/article/category/7042758
**Elasticsearch 2.4 知识点详解** Elasticsearch 2.4 是一个高性能、可扩展的全文搜索引擎,基于 Lucene 库构建,主要用于处理海量数据的实时搜索和分析。它不仅是一个搜索引擎,还是一个分布式、RESTful 风格的...
**Elasticsearch 2.4.1:分布式搜索引擎的核心特性** Elasticsearch 是一个开源的、分布式的全文搜索引擎,以其高效、灵活和可扩展性在IT行业中广泛应用。2.4.1版本是Elasticsearch的一个稳定版本,它包含了对之前...
docker实现elasticsearch批量dump导出导入,实现es批量导入导出
**Elasticsearch 2.4.6:分布式搜索引擎与日志分析** Elasticsearch 是一个开源的、基于 Lucene 的全文搜索引擎,它提供了一个分布式、RESTful 风格的搜索和数据分析引擎,用于处理海量数据,特别是对于日志管理和...
es-head是一个针对Elasticsearch的可视化操作插件。它提供了一个便捷的操作工具,可以连接Elasticsearch搜索引擎,并提供可视化的操作页面,对Elasticsearch进行各种设置和数据检索功能的管理。 es-head 插件可以在...
同时,根据实际需求,你还可以扩展这些方法,比如支持更复杂的查询、批量操作等。 最后,别忘了在应用结束时关闭客户端: ```java client.close(); ``` 通过这样的封装,你可以轻松地在Java应用程序中与Elastic...
消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 <groupId>org.elasticsearch <artifactId>elastic...
Elasticsearch SQL插件是为Elasticsearch设计的一个强大工具,允许用户通过SQL查询语言来操作Elasticsearch的数据。此插件的版本为2.4.3.0,提供了与传统关系型数据库相似的交互方式,使那些熟悉SQL语法的用户能够更...
Elasticsearch SQL插件是为Elasticsearch设计的一款强大的工具,它使得用户能够通过SQL(结构化查询语言)来查询、分析存储在Elasticsearch索引中的数据。这个插件的版本是2.4.5.0,它提供了一种熟悉的方式来访问...
用于批量启动elasticsearch、logstash、kibana的批处理脚本
Linux环境下使用sqlplus工具将oracle中的数据导入到elasticsearch中。只需要在es_bulk_tool.properties配置sql即可实现数据的批量导入。在elasticsearch6中测试通过。shell脚本需要使用sqlplus。
在.NET Core中使用Elasticsearch,可以借助各种客户端库,实现与Elasticsearch服务器的交互,执行索引、查询、更新和删除等操作。 这个"TCT.Net.Base.ElasticSearch"库很可能是一个封装了Elasticsearch.NET和Nest的...
(狂神)ElasticSearch快速入门笔记,ElasticSearch基本操作以及爬虫(Java-ES仿京东实战),包含了小狂神讲的东西,特别适合新手学习,笔记保存下来可以多看看。好记性不如烂笔头哦~,ElasticSearch,简称es,es是一个...
首先,我的索引结构是酱紫的。 ...以上所述是小编给大家介绍的Python中elasticsearch插入和更新数据的实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!
**Elasticsearch与MySQL热更新IK词典项目详解** 在现代大数据检索和分析场景中,Elasticsearch(ES)作为一款强大的开源搜索引擎,被广泛应用于日志分析、全文搜索等领域。而IK(Intelligent Chinese)分词器是针对...
最后,`es`包可能是Elasticsearch相关的操作接口或抽象类,它们定义了与Elasticsearch交互的方法,如添加、更新、删除文档,以及查询等。例如: ```java public interface ElasticsearchRepository { void save...
Elasticsearch查询客户端是用于与ES服务器通信的软件,它们提供了多种语言的API,允许开发者以编程方式执行索引、搜索、更新和删除等操作。常见的Elasticsearch客户端包括: - **Jest**:一个轻量级的Java REST...