`

Elasticsearch2.4 Template 批量操作,条件更新,删除

阅读更多

编写不易,转载请注明(  http://shihlei.iteye.com/blog/2339398)!

 

一 前言

    ES 做简单的条件查询,条件删除,在2.4版没有提供,script只提供的update的方案,自己简单封装了下。做面向对象的使用。    

 

二 依赖

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.20</version>
        </dependency>

 

三 程序

连接客户端:

public class EsTcpClient {

    private static final String ES_HOST = "localhost";
    private static final int ES_TCP_PORT = 9300;

    private static TransportClient client;

    /**
     * 获取TCP 客户端
     *
     * @return
     */
    public static synchronized TransportClient getClient() {
        if (client == null) {
            build();
        }
        return client;
    }

    /**
     * 关闭客户端
     */
    public static void close(TransportClient client) {
        if (client != null) {
            client.close();
        }
    }

    /**
     * 建立连接
     *
     * @return
     */
    private static void build() {
        try {
            //特别注意:如果cluster 起了名字,需要在连接时指定名字,否则验证客户端连接的不是默认集群elasticsearch,会忽略,则无法找到节点
            Settings settings = Settings.settingsBuilder()
                    .put("cluster.name", "mycluster").build();
            client = TransportClient.builder().settings(settings).build()
                    .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT));
//                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
        } catch (UnknownHostException e) {
            throw new RuntimeException(e);
        }
    }

}

 

 

 

Template

 

 

public class EsTemplate<T extends EsTemplate.EsBean> {

    //TCP连接客户端:
    private TransportClient client = null;

    public EsTemplate(TransportClient client) {
        this.client = client;
    }

    /**
     * 新增
     *
     * @param doc
     * @return
     */
    public boolean insert(T doc) {
        String json = JSON.toJSONString(doc);
        IndexResponse response = client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json).get();
        return response.isCreated();
    }

    /**
     * 替换
     *
     * @param doc
     * @return
     */
    public boolean replace(T doc) {
        return update(doc);
    }

    /**
     * 更新
     *
     * @param doc
     * @return
     */
    public boolean update(T doc) {
        String json = JSON.toJSONString(doc);

        UpdateResponse response = client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId())
                .setDoc(json)
                .get();

        return !response.isCreated();
    }

    /**
     * 删除
     *
     * @param doc
     * @return
     */
    public boolean delete(T doc) {
        DeleteResponse response = client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId()).get();
        return response.isFound();
    }

    /**
     * 查询
     *
     * @param indics   索引名
     * @param type     类型
     * @param id       id
     * @param docClass 检索类型
     * @return
     */
    public T searchById(String indics, String type, String id, Class<T> docClass) {
        GetResponse response = client.prepareGet(indics, type, id).get();
        if (response.isExists()) {
            String json = response.getSourceAsString();
            return JSON.parseObject(json, docClass);
        }
        return null;
    }

    /**
     * 条件查询
     *
     * @param criterias
     * @return
     */
    public List<T> search(String indics, String type, Collection<Criteria> criterias, Class<T> docClass) {
        Function<SearchHit, T> resultConverter = searchHit -> {
            if (searchHit.isSourceEmpty()) {
                return null;
            }
            String json = searchHit.getSourceAsString();
            return JSON.parseObject(json, docClass);
        };

        return executeSearch(indics, type, criterias, true, resultConverter);
    }

    //批量部分------------------------------------------------------------------------------

    /**
     * 批量插入更新
     *
     * @param docs
     * @return
     */
    public int batchInsert(Collection<T> docs) {
        if (docs == null || docs.isEmpty()) {
            return 0;
        }
        //构建批量插入
        Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> {
            for (T doc : docs) {
                String json = JSON.toJSONString(doc);
                bulkRequestBuilder.add(client.prepareIndex(doc.esIndics(), doc.esType(), doc.esId()).setSource(json));
            }
        };

        executeBulk(buildRequestFunction);
        return docs.size();
    }

    /**
     * 批量更新
     *
     * @param docs
     * @return
     */
    public int batchUpdate(Collection<T> docs) {
        if (docs == null || docs.isEmpty()) {
            return 0;
        }
        //构建批量更新
        Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> {
            for (T doc : docs) {
                String json = JSON.toJSONString(doc);
                bulkRequestBuilder.add(client.prepareUpdate(doc.esIndics(), doc.esType(), doc.esId())
                        .setDoc(json));
            }
        };

        executeBulk(buildRequestFunction);
        return docs.size();
    }

    /**
     * 批量删除
     *
     * @param docs
     * @return
     */
    public int batchDelete(Collection<T> docs) {
        if (docs == null || docs.isEmpty()) {
            return 0;
        }
        //构建批量更新
        Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> {
            for (T doc : docs) {
                String json = JSON.toJSONString(doc);
                bulkRequestBuilder.add(client.prepareDelete(doc.esIndics(), doc.esType(), doc.esId()));
            }
        };

        executeBulk(buildRequestFunction);
        return docs.size();
    }


    /**
     * 条件更新
     *
     * @param doc       文档
     * @param criterias 查询条件
     * @return 更新数量
     */
    public int updateByQuery(T doc, Collection<Criteria> criterias) {
        client.admin().indices().prepareRefresh().execute().actionGet();

        final List<String> documentIds = executeSearch(doc.esIndics(), doc.esType(), criterias, false, searchHit -> searchHit.getId());

        if (documentIds.isEmpty()) {
            return 0;
        }

        String json = JSON.toJSONString(doc);
        //构建批量更新
        Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> {
            for (String id : documentIds) {
                UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(doc.esIndics(), doc.esType(), id)
                        .setDoc(json);

                bulkRequestBuilder.add(updateRequestBuilder);
            }
        };

        executeBulk(buildRequestFunction);

        return documentIds.size();
    }

    /**
     * 条件删除
     *
     * @param criterias
     * @return
     */
    public int deleteByQuery(String indics, String type, Collection<Criteria> criterias) {
        client.admin().indices().prepareRefresh().execute().actionGet();

        final List<String> documentIds = executeSearch(indics, type, criterias, false, searchHit -> searchHit.getId());
        if (documentIds.isEmpty()) {
            return 0;
        }

        //构建批量删除
        Consumer<BulkRequestBuilder> buildRequestFunction = bulkRequestBuilder -> {
            for (String id : documentIds) {
                bulkRequestBuilder.add(client.prepareDelete(indics, type, id));
            }
        };

        executeBulk(buildRequestFunction);

        return documentIds.size();
    }

    /**
     * 执行批量操作
     *
     * @param buildRequestFunction 构建处理
     */
    private void executeBulk(Consumer<BulkRequestBuilder> buildRequestFunction) {
        BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();

        buildRequestFunction.accept(bulkRequestBuilder);

        BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();

        if (bulkResponse.hasFailures()) {
            throw new RuntimeException(bulkResponse.buildFailureMessage());
        }
    }


    //执行查询
    private <E> List<E> executeSearch(String indics, String type, Collection<Criteria> criterias, boolean needSource, Function<SearchHit, E> handleHitfunction) {
        List<E> results = new LinkedList<>();
        //指定查询的库表
        SearchRequestBuilder searchRequestBuilder = client.prepareSearch(indics);
        searchRequestBuilder.setTypes(type);

        searchRequestBuilder.setSize(9999);
        if (!needSource) {
            searchRequestBuilder.addFields();
        }

        if (criterias != null && !criterias.isEmpty()) {
            //构建查询条件必须嵌入filter中!
            BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
            for (Criteria c : criterias) {
                boolQueryBuilder.filter(QueryBuilders.termQuery(c.getFieldName(), c.getFieldValue()));
            }

            searchRequestBuilder.setQuery(boolQueryBuilder);
        }

        //请求
        SearchResponse searchResponse = searchRequestBuilder.get();

        //响应
        SearchHits hits = searchResponse.getHits();

        //无查询结果
        if (hits.totalHits() > 0) {
            SearchHit[] hitList = hits.getHits();
            for (SearchHit searchHit : hitList) {
                E e = handleHitfunction.apply(searchHit);
                if (e != null) {
                    results.add(e);
                }
            }
        }

        return results;
    }

    /**
     * 查询条件
     */
    public static class Criteria {
        private String fieldName;
        private Object fieldValue;

        public Criteria(String fieldName, Object fieldValue) {
            this.fieldName = fieldName;
            this.fieldValue = fieldValue;
        }

        public String getFieldName() {
            return fieldName;
        }

        public void setFieldName(String fieldName) {
            this.fieldName = fieldName;
        }

        public Object getFieldValue() {
            return fieldValue;
        }

        public void setFieldValue(Object fieldValue) {
            this.fieldValue = fieldValue;
        }
    }

    /**
     * ES操作Bean
     */
    public static abstract class EsBean {

        @JSONField(serialize = false, deserialize = false)
        protected Collection<Criteria> criterias = new LinkedList<>();

        public Collection<Criteria> getCriterias() {
            return criterias;
        }

        /**
         * 获取索引库名
         *
         * @return 索引库名
         */
        public abstract String esIndics();

        /**
         * 获取类型
         *
         * @return 索引类型名
         */
        public abstract String esType();

        /**
         * 获取doc id
         *
         * @return 文档id
         */
        public abstract String esId();

        /**
         * 添加操作条件
         *
         * @param criteria 条件
         */
        public void addCriteria(Criteria criteria) {
            criterias.add(criteria);
        }
    }
}

 

 

四 使用

public class EsTset {
    public static void main(String[] args) {
        TransportClient client = EsTcpClient.getClient();
        EsTemplate<TestEsBean> testEsBeanEsTemplate = new EsTemplate<>(client);

        //插入
        Collection<TestEsBean> testEsBeens = new LinkedList<>();
        for(int i=0;i<10;i++){
            testEsBeens.add(new TestEsBean(String.valueOf(i),"name"+i,i));
        }

        testEsBeanEsTemplate.batchInsert(testEsBeens);

        //查询
        Collection<TestEsBean> dbTestEsBeans = testEsBeanEsTemplate.search("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",10)),TestEsBean.class);
        for(TestEsBean bean: dbTestEsBeans){
            System.out.println(bean);
        }

        //删除
        testEsBeanEsTemplate.deleteByQuery("testDb","testBean", Arrays.asList(new EsTemplate.Criteria("age",1)));

    }

    public static class TestEsBean extends EsTemplate.EsBean {
        private String id;
        private String name;
        private int age;

        public TestEsBean(String id, String name, int age) {
            this.id = id;
            this.name = name;
            this.age = age;
        }

        @Override
        public String toString() {
            return "TestEsBean{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }

        /**
         * 获取索引库名
         *
         * @return 索引库名
         */
        @Override
        public String esIndics() {
            return "testDb";
        }

        /**
         * 获取类型
         *
         * @return 索引类型名
         */
        @Override
        public String esType() {
            return "testBean";
        }

        /**
         * 获取doc id
         *
         * @return 文档id
         */
        @Override
        public String esId() {
            return id;
        }
    }
}

 

分享到:
评论

相关推荐

    elasticsearch2.4对应的ik分词器

    **Elasticsearch 2.4 与 IK 分词器** Elasticsearch 是一款高度可扩展的开源全文搜索引擎,它提供了一种分布式、RESTful 风格的搜索和数据分析引擎,能够快速处理大量数据并进行复杂的搜索操作。版本 2.4 是 ...

    springBoot整合kafka和elasticSearch,实现批量拉取日志以及批量更新到es里

    本项目将详细讲解如何利用SpringBoot整合Kafka和Elasticsearch,实现日志的批量拉取和更新。 首先,我们需要在SpringBoot项目中引入相应的依赖。对于Kafka,我们需要添加`spring-kafka`依赖,它提供了与Kafka交互的...

    Java做客户端对Elasticsearch服务的增删改查及批量修改操作

    Java做客户端对Elasticsearch服务的增删改查及批量修改操作,代码简洁易懂,思路清晰有注释.详情参考https://blog.csdn.net/linhaiyun_ytdx/article/category/7042758

    elasticsearch2.4

    **Elasticsearch 2.4 知识点详解** Elasticsearch 2.4 是一个高性能、可扩展的全文搜索引擎,基于 Lucene 库构建,主要用于处理海量数据的实时搜索和分析。它不仅是一个搜索引擎,还是一个分布式、RESTful 风格的...

    elasticsearch-2.4.1+head

    **Elasticsearch 2.4.1:分布式搜索引擎的核心特性** Elasticsearch 是一个开源的、分布式的全文搜索引擎,以其高效、灵活和可扩展性在IT行业中广泛应用。2.4.1版本是Elasticsearch的一个稳定版本,它包含了对之前...

    docker实现elasticsearch批量dump导出导入

    docker实现elasticsearch批量dump导出导入,实现es批量导入导出

    elasticsearch2.4.6

    **Elasticsearch 2.4.6:分布式搜索引擎与日志分析** Elasticsearch 是一个开源的、基于 Lucene 的全文搜索引擎,它提供了一个分布式、RESTful 风格的搜索和数据分析引擎,用于处理海量数据,特别是对于日志管理和...

    es-head Elasticsearch的可视化操作插件

    es-head是一个针对Elasticsearch的可视化操作插件。它提供了一个便捷的操作工具,可以连接Elasticsearch搜索引擎,并提供可视化的操作页面,对Elasticsearch进行各种设置和数据检索功能的管理。 es-head 插件可以在...

    Elasticsearch的JAVA操作工具类,包括增删改查的封装

    同时,根据实际需求,你还可以扩展这些方法,比如支持更复杂的查询、批量操作等。 最后,别忘了在应用结束时关闭客户端: ```java client.close(); ``` 通过这样的封装,你可以轻松地在Java应用程序中与Elastic...

    java语言kafka数据批量导入到Elasticsearch实例

    消费kafka数据,然后批量导入到Elasticsearch,本例子使用的kafka版本0.10,es版本是6.4,使用bulk方式批量导入到es中,也可以一条一条的导入,不过比较慢。 &lt;groupId&gt;org.elasticsearch &lt;artifactId&gt;elastic...

    elasticsearch-sql-2.4.3.0.zip 插件 安装包

    Elasticsearch SQL插件是为Elasticsearch设计的一个强大工具,允许用户通过SQL查询语言来操作Elasticsearch的数据。此插件的版本为2.4.3.0,提供了与传统关系型数据库相似的交互方式,使那些熟悉SQL语法的用户能够更...

    elasticsearch-sql-2.4.5.0.zip

    Elasticsearch SQL插件是为Elasticsearch设计的一款强大的工具,它使得用户能够通过SQL(结构化查询语言)来查询、分析存储在Elasticsearch索引中的数据。这个插件的版本是2.4.5.0,它提供了一种熟悉的方式来访问...

    用于批量启动elasticsearch、logstash、kibana的批处理脚本

    用于批量启动elasticsearch、logstash、kibana的批处理脚本

    Oracle数据批量导入elasticsearch脚本

    Linux环境下使用sqlplus工具将oracle中的数据导入到elasticsearch中。只需要在es_bulk_tool.properties配置sql即可实现数据的批量导入。在elasticsearch6中测试通过。shell脚本需要使用sqlplus。

    基于.netcore搜索封装ElasticSearch.zip

    在.NET Core中使用Elasticsearch,可以借助各种客户端库,实现与Elasticsearch服务器的交互,执行索引、查询、更新和删除等操作。 这个"TCT.Net.Base.ElasticSearch"库很可能是一个封装了Elasticsearch.NET和Nest的...

    (狂神)ElasticSearch快速入门笔记,ElasticSearch基本操作以及爬虫(Java-ES仿京东实战)

    (狂神)ElasticSearch快速入门笔记,ElasticSearch基本操作以及爬虫(Java-ES仿京东实战),包含了小狂神讲的东西,特别适合新手学习,笔记保存下来可以多看看。好记性不如烂笔头哦~,ElasticSearch,简称es,es是一个...

    Python中elasticsearch插入和更新数据的实现方法

    首先,我的索引结构是酱紫的。   ...以上所述是小编给大家介绍的Python中elasticsearch插入和更新数据的实现方法,希望对大家有所帮助,如果大家有任何疑问欢迎给我留言,小编会及时回复大家的!

    ElasticSearch基于Mysql热更新IK词典项目

    **Elasticsearch与MySQL热更新IK词典项目详解** 在现代大数据检索和分析场景中,Elasticsearch(ES)作为一款强大的开源搜索引擎,被广泛应用于日志分析、全文搜索等领域。而IK(Intelligent Chinese)分词器是针对...

    elasticSearch的操作demo

    最后,`es`包可能是Elasticsearch相关的操作接口或抽象类,它们定义了与Elasticsearch交互的方法,如添加、更新、删除文档,以及查询等。例如: ```java public interface ElasticsearchRepository { void save...

    ES查询客户端,elasticsearch可视化工具 elasticsearch查询客户端

    Elasticsearch查询客户端是用于与ES服务器通信的软件,它们提供了多种语言的API,允许开发者以编程方式执行索引、搜索、更新和删除等操作。常见的Elasticsearch客户端包括: - **Jest**:一个轻量级的Java REST...

Global site tag (gtag.js) - Google Analytics