`

elasticsearch-river-kafka 插件的环境配置和使用

    博客分类:
  • ES
 
阅读更多

1.elasticsearch-river-kafka 插件的安装

elasticsearch-river-kafka 插件的安装与其他插件一样
cd $ELASTICSEARCH_HOME
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka
 
插件更新
cd $ELASTICSEARCH_HOME
./bin/plugin -remove elasticsearch-river-kafka
./bin/plugin -url file:/$PLUGIN_PATH -install elasticsearch-river-kafka

 

2.river节点的配置

配置river节点的时候,river节点和非river节点都要配置。
river节点:在es的配置文件中添加下面几行
#node.river: _none_    ##这一行要注释掉,表示为river节点
threadpool:
    bulk:
        type: fixed
        size: 60
        queue_size: 1000
 
非river节点:在es的配置文件中添加下面几行
node.river: _none_    ##这一行要解注,表示该节点不是river节点
threadpool:
    bulk:
        type: fixed
        size: 60
        queue_size: 1000
注意:一般,不会将数据落在river节点上(即node.data: false),但测试环境上就无所谓了,机器资源又紧张。
          节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点

 

3.elasticsearch-river-kafka 插件的开发

社区中的elasticsearch-river-kafka 插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。
那么这个时候,我们就得自己去开发elasticsearch-river-kafka 插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka 插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述
@Override
    public String name() {
        return "river-kafka";
    }
    @Override
    public String description() {
        return "River Kafka Plugin";
    }
 
2)es-plugin.properties配置文件
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin
 
3)KafkaRiverModule
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来
public void onModule(RiversModule module) {
        module.registerRiver("kafka", KafkaRiverModule.class);
 }
KafkaRiverModule必须继承AbstractModule 。在KafkaRiverModule中会生成一个KafkaRiver。KafkaRiver是River接口的实现。
public class KafkaRiverModule extends AbstractModule {
    @Override
    protected void configure() {
        bind(River.class).to(KafkaRiver.class).asEagerSingleton();
    }
}

 

4)KafkaRiver
    – KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
    – KafkaRiver只提供两个方法:start和close。
    – AbstractRiverComponent 用于initialize kafkariver的logger、river名、river的配置
    – 构造函数通过@Inject注入river所需要的一切东西:RiverName, RiverSettings、logger、自定义的配置信息
      (这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
    – 在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
    – kafkaConsumer用来定义从kafka中读取数据时的用户操作。
    – ElasticsearchProducer用来定义将数据写入ES时的用户操作。
public class KafkaRiver extends AbstractRiverComponent implements River {
    private BasicProperties properties;
    private KafkaConsumer kafkaConsumer;
    private ElasticsearchProducer elasticsearchProducer;
    private static  ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    private Thread riverMonitorThread;
    private KafkaRiverSubMonitor kafkaRiverSubMonitor;
    private Thread thread;
    private ESLogger logger;
    @Inject
    protected KafkaRiver(RiverName riverName, RiverSettings settings, Client client) {
        super(riverName, settings);
        this.logger = Loggers.getLogger(getClass(), settings.globalSettings(), riverName);
        properties = new BasicProperties(settings);
        elasticsearchProducer = new ElasticsearchProducer(client, properties);
        kafkaConsumer = new KafkaConsumer(riverName, properties, elasticsearchProducer);
    }
    @Override
 
    public void start() {
     //启动KafkaRiver的线程
        try {
            logger.info("MHA: Starting Kafka Worker...");
            thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "kafka_river").newThread(kafkaConsumer);
            thread.start();
        catch (Exception ex) {
            logger.error("Unexpected Error occurred", ex);
            throw new RuntimeException(ex);
        }
    }
     ......
}
 

4.kafka→river→es的数据存储

通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。
注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类
    "type""kafka",
    "kafka": {
        "topic" "test",
        "numOfConsumer" "2",
        "zk.connect" "10.10.10.10:2181",
        "zk.session.timeout.ms" "50000",
        "zk.sync.time.ms" "200",
        "zk.auto.commit.interval.ms" "1000",
        "zk.auto.commit.enable" "true",
        "zk.auto.offset.reset" "smallest",
        "zk.fetch.message.max.bytes" "5242880",
        "serializer" "com.test.elasticsearch.river.kafka.serializer.AASerializer"
    },
     "elasticsearch" : {
        "indexName" "stringfortest",
        "indexType" "message1",
        "batch_size" "500",
        "handling_batch_coresize" "2",
        "handling_batch_maximumPoolSize" "2",
        "handling_batch_keepAliveTime" "600",
        "handling_batch_queueSize" "10",
        "es_bulk_timeout" "5"
    }
}'
上述指令中主要配置信息的说明:
kafka中  →
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中  →
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果
{"_index":"_river",
 "_type":"baymaxriver1",
 "_id":"_meta",
 "_version":1,
 "created":true
}
 
查看river的元数据http://ip:9200/_river/rivername/_meta
 
删除一条river

 

1
1
分享到:
评论

相关推荐

    elasticsearch-river-kafka:用于 Elasticsearch 的 Kafka River

    # this will create a file here: target/releases/elasticsearch-river-kafka-1.0.1-SNAPSHOT.zip PLUGIN_PATH=`pwd`/target/releases/elasticsearch-river-kafka-1.0.1-SNAPSHOT.zip 安装插件 cd $ELASTIC...

    Elasticsearch-HBase-River同步

    在使用Elasticsearch-HBase-River之前,你需要确保已经安装了Elasticsearch和HBase,并且它们都正常运行。接下来,下载Elasticsearch-HBase-River插件,将其解压到ES的plugins目录下。重启ES服务,插件就会自动加载...

    Elasticsearch-head谷歌插件谷歌插件.zip

    "es-head"是Elasticsearch-head的简写,它允许用户无需编写复杂的curl命令就能与Elasticsearch进行交互。通过这个插件,你可以查看索引的状态,监控节点健康状况,查看集群统计信息,甚至进行索引的创建、删除和映射...

    elasticsearch-river-neo4j-1.2.1.1.zip

    在实际应用中,Elasticsearch River Neo4j 插件的使用场景可能包括但不限于社交网络分析、推荐系统、知识图谱构建等。例如,在社交网络分析中,可以利用 Neo4j 存储用户之间的关系,利用 Elasticsearch 进行快速的...

    elasticsearch可视化工具elasticsearch-head谷歌离线安装插件

    解压文件 elasticsearch-head.zip 安装插件: google ---》更多工具----》扩展程序 打开Google的扩展程序,点击加载已解压的扩展程序,选择解压elasticsearch-head文件夹即可添加插件成功

    elasticsearch-river-jdbc-1.5.0.5.jar

    elasticsearch-river-jdbc-1.5.0.5.jar

    elasticsearch-analysis-dynamic-synonym-7.2.0

    为了提高搜索的准确性和便利性,Elasticsearch提供了丰富的分析插件,其中"elasticsearch-analysis-dynamic-synonym-7.2.0"就是一款针对同义词处理的重要组件。 同义词分析插件在信息检索中扮演着关键角色,它允许...

    es-head Elasticsearch的可视化操作插件

    es-head是一个针对Elasticsearch的可视化操作插件。它提供了一个便捷的操作工具,可以连接Elasticsearch搜索引擎,并提供可视化的操作页面,对Elasticsearch进行各种设置和数据检索功能的管理。 es-head 插件可以在...

    ES客户端+谷歌浏览器插件+Multi-Elasticsearch-Head

    使用Multi-Elasticsearch-Head,管理员或开发者可以轻松地查看和对比不同集群的状态、性能指标以及数据分布,这对于分布式系统监控和故障排查特别有用。它通常提供了一种直观的方式来查看索引结构、执行查询语句、...

    elasticsearch-sql-2.4.3.0.zip 插件 安装包

    通常,Elasticsearch插件会安装在`$ES_HOME/plugins`目录下,其中`$ES_HOME`是Elasticsearch安装目录。 4. **离线安装**: 由于在线安装可能会因为网络问题导致失败,我们选择离线安装方式。在Elasticsearch的安装...

    elasticSearch-head-0.1.5 Chrome 谷歌浏览器插件 免费下载

    ElasticSearch Head是一款基于Chrome浏览器的插件,用于可视化地管理和监控Elasticsearch集群。它提供了用户友好的界面,使得用户无需编写复杂的查询语句就能查看、操作索引、节点、文档等信息,大大简化了Elastic...

    elasticsearch-7.12.1插件

    其中包括三个:elasticsearch-analysis-pinyin-7.12.1,elasticsearch-analysis-ik-7.12.1,elasticsearch-analysis-dynamic-synonym-7.12.1

    elasticsearch-head谷歌插件

    4. **节省资源**:作为独立的Web应用,Head插件不会占用Elasticsearch服务器的资源,避免了直接安装在ES上的潜在问题。 **二、使用Elasticsearch-Head谷歌插件** 1. **安装与配置**:由于是谷歌插件,用户可以通过...

    ElasticSearch-head插件

    总的来说,Elasticsearch-head插件是一个轻量级且实用的Elasticsearch可视化工具,尤其适合初学者和小型项目使用。尽管其功能相比Kibana有所欠缺,但对于理解Elasticsearch的基本操作和数据结构,仍是一个非常有价值...

    elasticsearch-analysis-ik-7.10.0.zip下载

    "elasticsearch-analysis-ik"是针对Elasticsearch的一个中文分词插件,它的主要功能是提供高效、精准的中文分词能力,使得Elasticsearch能够更好地理解和处理中文文本数据。 在Elasticsearch 7.10.0版本中,...

    elasticSearch(ES)最新版 ik分词插件7.10 elasticsearch-analysis-ik-7.10.0

    在中文环境下,为了实现精确的分词和搜索,我们需要安装适合版本的分词插件,如“elasticsearch-analysis-ik”。这个插件是为Elasticsearch设计的中文分词器,能够对中文文本进行有效的分词处理,提高搜索的准确性和...

    ES同义词插件 elasticsearch-analysis-dynamic-synonym-6.5.1.rar

    在IT领域,尤其是在搜索引擎优化和大数据分析中,Elasticsearch(ES)是一个广泛使用的开源全文检索引擎。它基于Lucene库,提供了分布式、实时、高可用性以及容错能力的数据存储和搜索解决方案。本篇文章将重点讲解...

    elasticsearch-head插件

    这款插件设计简洁,易于使用,可以帮助用户更好地理解、调试和优化Elasticsearch集群。 **一、安装Elasticsearch-Head** 1. **获取源代码** 首先,你需要从GitHub仓库下载elasticsearch-head的源代码。在本例中,...

    elasticsearch-jieba-plugin 8.8.2.zip

    总的来说,`elasticsearch-jieba-plugin`是Elasticsearch处理中文数据的得力助手,它让ES具备了强大的中文分词能力,极大地拓宽了其在中文环境下的应用场景。正确安装和配置后,你可以享受到更高效、更精准的中文...

    elasticsearch-head-5.0.0.zip

    因此,如果你正在使用的是较新版本的Elasticsearch,可能需要寻找替代的可视化工具,如Kibana,它已经成为Elastic Stack的重要组成部分,提供了更全面的功能和更好的集成。 总之,elasticsearch-head-5.0.0.zip是...

Global site tag (gtag.js) - Google Analytics