`

elasticsearch 1.7集群配置

阅读更多
一、简介
 
ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持需要ZooKeeper的支持。
 
这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/
 
二、基本用法
 
Elasticsearch集群可以包含多个索引(indices),每一个索引可以包含多个类型(types),每一个类型包含多个文档(documents),然后每个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。
 
ES比传统关系型数据库,对一些概念上的理解:
 
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices   -> Types  -> Documents -> Fields
从创建一个Client到添加、删除、查询等基本用法:
 
1、创建Client
 
public ElasticSearchService(String ipAddress, int port) {
        client = new TransportClient()
                .addTransportAddress(new InetSocketTransportAddress(ipAddress,
                        port));
    }
这里是一个TransportClient。
 
ES下两种客户端对比:
 
TransportClient:轻量级的Client,使用Netty线程池,Socket连接到ES集群。本身不加入到集群,只作为请求的处理。
 
Node Client:客户端节点本身也是ES节点,加入到集群,和其他ElasticSearch节点一样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。
 
2、创建/删除Index和Type信息
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// 创建索引
public void createIndex() {
    client.admin().indices().create(new CreateIndexRequest(IndexName))
            .actionGet();
}
 
// 清除所有索引
public void deleteIndex() {
    IndicesExistsResponse indicesExistsResponse = client.admin().indices()
            .exists(new IndicesExistsRequest(new String[] { IndexName }))
            .actionGet();
    if (indicesExistsResponse.isExists()) {
        client.admin().indices().delete(new DeleteIndexRequest(IndexName))
                .actionGet();
    }
}
 
// 删除Index下的某个Type
public void deleteType(){
    client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet();
}
 
// 定义索引的映射类型
public void defineIndexTypeMapping() {
    try {
        XContentBuilder mapBuilder = XContentFactory.jsonBuilder();
        mapBuilder.startObject()
        .startObject(TypeName)
            .startObject("properties")
                .startObject(IDFieldName).field("type", "long").field("store", "yes").endObject()
                .startObject(SeqNumFieldName).field("type", "long").field("store", "yes").endObject()
                .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").field("store", "yes").endObject()
                .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject()
            .endObject()
        .endObject()
        .endObject();
 
        PutMappingRequest putMappingRequest = Requests
                .putMappingRequest(IndexName).type(TypeName)
                .source(mapBuilder);
        client.admin().indices().putMapping(putMappingRequest).actionGet();
    } catch (IOException e) {
        log.error(e.toString());
    }
}

 

这里自定义了某个Type的索引映射(Mapping),默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。
 
注意:针对字符串,ES默认会做“analyzed”处理,即先做分词、去掉stop words等处理再index。如果你需要把一个字符串做为整体被索引到,需要把这个字段这样设置:field("index", "not_analyzed")。
 
3、索引数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 批量索引数据
public void indexHotSpotDataList(List<Hotspotdata> dataList) {
    if (dataList != null) {
        int size = dataList.size();
        if (size > 0) {
            BulkRequestBuilder bulkRequest = client.prepareBulk();
            for (int i = 0; i < size; ++i) {
                Hotspotdata data = dataList.get(i);
                String jsonSource = getIndexDataFromHotspotData(data);
                if (jsonSource != null) {
                    bulkRequest.add(client
                            .prepareIndex(IndexName, TypeName,
                                    data.getId().toString())
                            .setRefresh(true).setSource(jsonSource));
                }
            }
 
            BulkResponse bulkResponse = bulkRequest.execute().actionGet();
            if (bulkResponse.hasFailures()) {
                Iterator<BulkItemResponse> iter = bulkResponse.iterator();
                while (iter.hasNext()) {
                    BulkItemResponse itemResponse = iter.next();
                    if (itemResponse.isFailed()) {
                        log.error(itemResponse.getFailureMessage());
                    }
                }
            }
        }
    }
}
 
// 索引数据
public boolean indexHotspotData(Hotspotdata data) {
    String jsonSource = getIndexDataFromHotspotData(data);
    if (jsonSource != null) {
        IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName,
                TypeName).setRefresh(true);
        requestBuilder.setSource(jsonSource)
                .execute().actionGet();
        return true;
    }
 
    return false;
}
 
// 得到索引字符串
public String getIndexDataFromHotspotData(Hotspotdata data) {
    String jsonString = null;
    if (data != null) {
        try {
            XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
            jsonBuilder.startObject().field(IDFieldName, data.getId())
                    .field(SeqNumFieldName, data.getSeqNum())
                    .field(IMSIFieldName, data.getImsi())
                    .field(IMEIFieldName, data.getImei())
                    .field(DeviceIDFieldName, data.getDeviceID())
                    .field(OwnAreaFieldName, data.getOwnArea())
                    .field(TeleOperFieldName, data.getTeleOper())
                    .field(TimeFieldName, data.getCollectTime())
                    .endObject();
            jsonString = jsonBuilder.string();
        } catch (IOException e) {
            log.equals(e);
        }
    }
 
    return jsonString;
}

 

ES支持批量和单个数据索引。
 
4、查询获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 获取少量数据100个
private List<Integer> getSearchData(QueryBuilder queryBuilder) {
    List<Integer> ids = new ArrayList<>();
    SearchResponse searchResponse = client.prepareSearch(IndexName)
            .setTypes(TypeName).setQuery(queryBuilder).setSize(100)
            .execute().actionGet();
    SearchHits searchHits = searchResponse.getHits();
    for (SearchHit searchHit : searchHits) {
        Integer id = (Integer) searchHit.getSource().get("id");
        ids.add(id);
    }
    return ids;
}
 
// 获取大量数据
private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) {
    List<Integer> ids = new ArrayList<>();
    // 一次获取100000数据
    SearchResponse scrollResp = client.prepareSearch(IndexName)
            .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000))
            .setQuery(queryBuilder).setSize(100000).execute().actionGet();
    while (true) {
        for (SearchHit searchHit : scrollResp.getHits().getHits()) {
            Integer id = (Integer) searchHit.getSource().get(IDFieldName);
            ids.add(id);
        }
        scrollResp = client.prepareSearchScroll(scrollResp.getScrollId())
                .setScroll(new TimeValue(600000)).execute().actionGet();
        if (scrollResp.getHits().getHits().length == 0) {
            break;
        }
    }
 
    return ids;
}

 

这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也可以一次性获取大量数据,需要使用Scroll Search。
 
5、聚合(Aggregation Facet)查询 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 得到某段时间内设备列表上每个设备的数据分布情况<设备ID,数量>
public Map<String, String> getDeviceDistributedInfo(String startTime,
        String endTime, List<String> deviceList) {
 
    Map<String, String> resultsMap = new HashMap<>();
 
    QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList);
    QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime);
    QueryBuilder queryBuilder = QueryBuilders.boolQuery()
            .must(deviceQueryBuilder).must(rangeBuilder);
 
    TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE)
            .field(DeviceIDFieldName);
    SearchResponse searchResponse = client.prepareSearch(IndexName)
            .setQuery(queryBuilder).addAggregation(termsBuilder)
            .execute().actionGet();
    Terms terms = searchResponse.getAggregations().get("DeviceIDAgg");
    if (terms != null) {
        for (Terms.Bucket entry : terms.getBuckets()) {
            resultsMap.put(entry.getKey(),
                    String.valueOf(entry.getDocCount()));
        }
    }
    return resultsMap;
}

 

Aggregation查询可以查询类似统计分析这样的功能:如某个月的数据分布情况,某类数据的最大、最小、总和、平均值等。
 
三、集群配置
 
配置文件elasticsearch.yml
 
集群名和节点名:
 
#cluster.name: elasticsearch
 
#node.name: "Franz Kafka"
 
是否参与master选举和是否存储数据
 
#node.master: true
 
#node.data: true
 
分片数和副本数
 
#index.number_of_shards: 5
#index.number_of_replicas: 1
 
master选举最少的节点数,这个一定要设置为整个集群节点个数的一半加1,即N/2+1
 
#discovery.zen.minimum_master_nodes: 1
 
discovery ping的超时时间,拥塞网络,网络状态不佳的情况下设置高一点
 
#discovery.zen.ping.timeout: 3s
 
注意,分布式系统整个集群节点个数N要为奇数个!!
 
四、Elasticsearch插件
 
1、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head
 
2、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql
 
github地址:https://github.com/NLPchina/elasticsearch-sql
 
3、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,可以通过它来查看ES集群的各种状态。
 
安装:./bin/plugin -install lukas-vlcek/bigdesk
 
访问:http://192.103.101.203:9200/_plugin/bigdesk/,
 
4、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件,
 
在https://github.com/elasticsearch/elasticsearch-servicewrapper下载该插件后,解压缩,将service目录拷贝到elasticsearch目录的bin目录下。
 
而后,可以通过执行以下语句安装、启动、停止ElasticSearch:
 
sh elasticsearch install
 
sh elasticsearch start
 
sh elasticsearch stop
分享到:
评论

相关推荐

    Elasticsearch集群.pdf

    下面我们就来深入探讨如何搭建一个基于Elasticsearch的集群环境,并实现一些高级功能,比如head插件的安装与使用,以及ES用户认证的创建。 首先,为了安装Elasticsearch,需要准备以下软件环境: - Java环境:...

    elasticsearch-8.6.1.7z

    6. 集群管理:Elasticsearch集群通过选举主节点进行管理,主节点负责协调索引分配、集群状态更新等任务,确保数据的一致性和完整性。 7. 数据恢复与故障转移:Elasticsearch具有自动的故障检测和恢复机制,当某个...

    Elasticsearch 1.7.0 (for Windows Linux)

    2. **配置**:根据实际需求配置`elasticsearch.yml`配置文件,包括设置网络端口、路径、内存分配等。 3. **启动与停止**:使用命令行启动和停止Elasticsearch服务,或者通过系统服务管理工具进行管理。 4. **数据...

    elasticsearch2.4

    **Elasticsearch 2.4 知识点详解** Elasticsearch 2.4 是一个高性能、可...确保正确配置 JDK 1.7 或更高版本后,你可以通过下载并解压提供的 `elasticsearch-2.4.0` 文件,开始探索和利用 Elasticsearch 的强大功能。

    elasticsearch-jdbc-2.2.0.0-dist

    Elasticsearch基于Lucene构建,但提供了更高级别的API和集群管理功能,使得大规模数据处理变得更加容易。 **JDBC (Java Database Connectivity)** 是Java平台中用于与关系数据库交互的标准API。它允许Java应用程序...

    Elasticsearch 技术解析与实战.zip

    290 6.5.3 分片智能分配 291 6.5.4 分片配置过滤 292 6.5.5 其他集群配置 293 6.6 小结 293 第7章 索引分词器 294 7.1 分词器的概念 294 7.2 中文分词器 298 7.3 插件 300 7.3.1 插件管理 301 7.3.2 插件安装 301 ...

    ElasticSearch Java 中文文档 5.6

    文档中提到了性能调优的内容,建议开发者根据实际应用场景,通过合理配置索引、更新映射设置、监控集群状态等措施,优化ElasticSearch的性能。 此外,文档中提到了作者在实际使用ElasticSearch过程中遇到的“坑”,...

    elasticsearch-analysis-pinyin-7.3.2.zip

    "elasticsearch-analysis-pinyin-7.3.2"插件内部采用了nlp-lang-1.7.jar,这是一个专门处理语言的库,包含对中文字符的拼音转换算法。在索引文档时,这个分词器会将每个中文字符转化为对应的全拼或简拼,然后存储...

    elasticsearch-1.7.2.tar.gz

    2. Java运行环境:Elasticsearch是用Java编写的,因此需要JRE(Java Runtime Environment)或JDK(Java Development Kit)版本在1.7至1.8之间。安装前需确保已安装并配置好Java环境变量。 安装过程包括以下步骤: 1...

    elasticsearch-pinyin分词器

    此外,对于性能的考虑,可以选择合适的硬件资源和集群配置,确保Elasticsearch在处理大量拼音分词任务时仍能保持高效。 总结来说,"elasticsearch-pinyin分词器"是Elasticsearch处理中文搜索的重要工具,通过将汉字...

    elasticsearch Lnuix 环境搭建说明

    然而,如果服务器上有其他项目依赖于JDK 1.7版本,可以通过设置临时变量`ES_JAVA_HOME`来指定Elasticsearch使用的JDK路径。 **具体步骤**: 1. **编辑profile文件**:通过命令`vi /etc/profile`编辑`profile`文件...

    Kafka 2.9 版本 jdk1.7+

    此外,Kafka Connect允许你方便地将数据导入导出到其他数据存储,如Hadoop、Elasticsearch等。 在实际应用中,Kafka常被用于日志聚合、网站活动跟踪、流处理应用、微服务间的通信等多个场景。其高效的消息传递能力...

    elasticsearch.zip

    其中,`config`目录下的配置文件(如elasticsearch.yml)是设置Elasticsearch行为的关键,允许我们定制集群的网络、存储、安全等参数。 接下来是"elasticsearch-head.7z",这是一个基于网页的Elasticsearch界面工具...

    ElasticSearch搜索引擎使用

    #### 三、ElasticSearch安装与配置 - **下载**: 从Elasticsearch官网(https://www.elastic.co/products/elasticsearch)获取最新的版本。 - **安装**: 解压至指定目录,注意目录路径中不要包含中文或特殊字符。 - **...

    openshift-elasticsearch:Openshift 的弹性搜索盒。 它包括 Kibana 4 作为其仪表板并使用 Nginx 1.7 作为代理服务器

    此卡式盒提供 Elasticsearch 集群作为带有 Kibana 4.0 仪表板的独立应用程序。 Kibana 在根/ ,Elasticsearch 在/elasticsearch elasticsearch 上提供服务。 还配置了健康检查 url 并且位于/health 。 要创建您的 ...

    ElasticSearch

    2. **初始化客户端方式的变化**: 从1.7版本到2.x版本再到5.x版本,ElasticSearch初始化客户端的方式经历了多次变更。这种频繁的变化可能会给开发者带来一定的困扰,尤其是在项目迁移过程中需要特别注意。 #### 三、...

    flink-1.7-中文文档.pdf

    - **Elasticsearch连接器**:向Elasticsearch写入数据。 - **HDFS连接器**:读写HDFS文件系统。 - **流文件接收器**:处理来自文件系统的数据流。 - **RabbitMQ连接器**:与RabbitMQ消息队列集成。 - **Apache ...

    flume17ess522_0827_01

    标签“flume1.7+es”简洁明了地表达了这个主题的关键技术组合,即Flume的1.7版本与Elasticsearch的集成。 在压缩包子文件的文件名称列表中,“flume-ng-elasticsearch-sink”是Flume的一个插件,它使得Flume能够将...

Global site tag (gtag.js) - Google Analytics