`
Javahuhui
  • 浏览: 80059 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

KafKa服务搭建

阅读更多

事先安装好zookeeper

 

1.下载kafka:http://kafka.apache.org/downloads.html

这里我下载的是kafka_2.11-0.11.0.1.tgz

 

2.解压

tar -xzf kafka_2.11-0.11.0.1.tgz

解压后的目录结构

 

3.修改配置config/server.properties

主要修改:

broker.id=1

port=9092

host.name=broker的主机地址

#zookeeper主机地址和端口

zookeeper.connect=ip1:port1,ip2:port2,ip3:port3

详细参数说明参照:http://blog.csdn.net/lizhitao/article/details/25667831

 

4.启动kafka

bin/kafka-server-start.sh config/server.properties &

 

5.发送消息测试

启动producer

bin/kafka-console-producer.sh --broker-list localost:9092 --topic test

注意:此处localhost为本机IP,否则报错

启动后随便输入消息内容

>kafka消息发送测试

 

打开另外窗口,启动consumer

bin/kafka-console-consumer.sh --zookeeper localhost:port --topic test --from-beginning

注意:同样localhost和port是zookeeper服务IP和端口,有多个就用逗号“,”隔开

 

6.配置集群

在这里配置的是伪集群

拷贝配置文件

cp config/server.properties config/server-2.properties

修改参数:

broker.id=2

port=9093

host.name=broker的主机地址

 

7.启动新节点

bin/kafka-server-start.sh config/server-2.properties &

 

8.Java开发使用

1)引用相关jar包

<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_2.9.2</artifactId>
			<version>0.8.1.1</version>
			<exclusions>
				<!-- 实际应用中单独引入下面的jar包,不使用kafka带的 -->
				<exclusion>
					<artifactId>zookeeper</artifactId>
					<groupId>org.apache.zookeeper</groupId>
				</exclusion>
				<exclusion>
					<artifactId>zkclient</artifactId>
					<groupId>com.101tec</groupId>
				</exclusion>
				<exclusion>
					<artifactId>slf4j-api</artifactId>
					<groupId>org.slf4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

		<!-- Zookeeper客户端 -->
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.4</version>
			<exclusions>
				<exclusion>
					<artifactId>log4j</artifactId>
					<groupId>log4j</groupId>
				</exclusion>
			</exclusions>
		</dependency>

 

 

具体代码参照:http://www.cnblogs.com/lilixin/p/5775877.html

这里给出关键配置:

kafka.properties

zookeeper.connect=192.168.1.190:2181,192.168.1.190:2182,192.168.1.190:2183
#zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181
metadata.broker.list=192.168.1.190:9092,192.168.1.190:9093
#metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092
 
#zookeeper.connect.timeout=15000
#zookeeper.session.timeout.ms=15000
#zookeeper.sync.time.ms=20000
#auto.commit.interval.ms=20000
#auto.offset.reset=smallest
#serializer.class=kafka.serializer.StringEncoder
#producer.type=async
#queue.buffering.max.ms=6000
 
test.group.id=huhui
kafka.test.topics=huhui

 

applicationContext.xml 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
	xmlns:context="http://www.springframework.org/schema/context"
	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
	xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
	xsi:schemaLocation="
        http://www.springframework.org/schema/beans 
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/mvc 
        http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
        http://www.springframework.org/schema/context 
        http://www.springframework.org/schema/context/spring-context-3.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
        http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd"
	default-autowire="byName">

	<!-- 这个是加载给spring 用的. -->
	<bean id="propertyConfigurer"
		class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	<!-- 这个是用来在代码中注入用的. -->
	<bean id="configProperties"
		class="org.springframework.beans.factory.config.PropertiesFactoryBean">
		<property name="locations">
			<list>
				<value>classpath:kafka.properties</value>
			</list>
		</property>
	</bean>
	
	<!-- kafka -->
	<import resource="applicationContext-kafka-producer.xml"/>
	<import resource="applicationContext-kafka-receiver.xml"/>

</beans>

 

applicationContext-kafka-producer.xml 

<bean id="topProducer" class="top.lilixin.TopProducer">
         <constructor-arg index="0" value="${metadata.broker.list}" />
    </bean>    

 

applicationContext-kafka-receiver.xml 

<!-- 定义消息处理器 -->
	<bean id="testConsumer" class="top.lilixin.TestConsumer"></bean>

	<!-- 定义收信人 receiver -->
	<bean id="topReceiver" class="top.lilixin.TopReceiver">

		<constructor-arg index="0" value="${zookeeper.connect}" /><!-- 
			_zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_ -->

		<constructor-arg index="1" value="${test.group.id}" /><!-- 
			_消费者所属组id字符串 ,如:vko_group_article_read_count_ -->

		<constructor-arg index="2" value="${kafka.test.topics}" /><!-- 
			_要消费的消息主题,如:vko_group_ -->

		<constructor-arg index="3" ref="testConsumer" /> <!--_上面定义的消息处理器_ -->
	</bean>

 

 

项目原代码中TopReceiver.java有点小问题:服务关闭后,重启服务会重复读取消息。

原代码:

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,2);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        for (final KafkaStream<byte[], byte[]> stream : streams) {
	        	new Thread(){
	        		public void run(){
	        			ConsumerIterator<byte[], byte[]> it = stream.iterator();
	    		        while(it.hasNext()){
	    		        	String msg = new String(it.next().message());
	    		        	try{
	    		        	 topConsumer.dealMsg(msg);
	    		        	}catch(Exception e){
	    		        		log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e);
	    		        	}
	    		        	log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg);
	    		        }
	        		}
	        	}.start();
	        	log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect);
	        }

 

首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。

Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。

使用High Level Consumer,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:

  • 如果线程数量大于主题的分区数量,一些线程将得不到任何消息
  • 如果分区数大于线程数,一些线程将得到多个分区的消息
  • 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。
  • 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。

Kafka不会再每次读取消息后马上更新zookeeper上的offset,而是等待一段时间。由于这种延迟,有可能消费者读取了一条消息,但没有更新offset。所以,当客户端关闭或崩溃后,从新启动时有些消息重复读取了。另外,broker宕机或其他原因导致更换了partition的leader,也会导致消息重复读取。

为了避免这种问题,你应该提供一个平滑的关闭方式,而不是使用kill -9

修改后:

private static final int THREAD_AMOUNT = 2;
……

// 目前每个topic都是2个分区
	     topicCountMap.put(topic,THREAD_AMOUNT);
	     Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap);
	        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	        ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT);
	        //使用ExecutorService来调度线程
	        for (int i = 0; i < streams.size(); i++) {
                KafkaStream<byte[], byte[]> kafkaStream = streams.get(i);
                executor.submit(new HanldMessageThread(kafkaStream, i));
	        	
	        	
	        	log.info("kafka vkoConsumer 启动完成:groupId:"+groupId+",topic:"+topic+",zookeeperConnect:"+zookeeperConnect);
	        }


//关闭consumer
	        try {
	            Thread.sleep(10000);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
	        if (cc != null) {
	        	cc.shutdown();
	        }
	        if (executor != null) {
	            executor.shutdown();
	        }
	        try {
	            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
	            	log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
	            }
	        } catch (InterruptedException e) {
	        	log.info("Interrupted during shutdown, exiting uncleanly");
	        }

……
/**
	 * 具体处理message的线程
	 * @author Administrator
	 *
	 */
	class HanldMessageThread implements Runnable {
	 
	    private KafkaStream<byte[], byte[]> kafkaStream = null;
	    private int num = 0;
	     
	    public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) {
	        super();
	        this.kafkaStream = kafkaStream;
	        this.num = num;
	    }
	 
	    public void run() {
	        ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
	        while(iterator.hasNext()) {
	            String message = new String(iterator.next().message());
	            System.out.println("Thread no: " + num + ", message: " + message);
	        }
	    }
	     
	}

 具体参照:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

 

补充:非常不错的Kafka教程

http://orchome.com/kafka/index

  • 大小: 3.7 KB
分享到:
评论

相关推荐

    kafka环境搭建

    Kafka 环境搭建 Kafka 是一个distributed ...搭建 Kafka 环境需要安装 Java 环境、Python 环境、Zookeeper 和 Kafka 服务,并配置相关的环境变量和配置文件。通过这些步骤,可以成功搭建一个单机的 Kafka 环境。

    linux服务下,kafka服务搭建使用

    liunx系统下,组件服务搭建,安装必备.Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据

    kafka集群搭建文档

    本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...

    kafka集群搭建及测试.docx

    【Kafka集群搭建及测试】 Kafka是一种分布式流处理平台,常用于实时数据处理和大数据管道。本文档将详细介绍如何在三台Ubuntu 16虚拟机上搭建Kafka集群,并进行基本的测试,确保其正常运行。 **1. 准备工作** 在...

    kafka 库集成到vc 环境

    有时需要将kafka集成到vc环境中,应用于某种场景。 本资源是将kafka cpp库封装成生产者,...运行需要依赖搭建好的kafka服务器 搭建kafka服务器的文章可以从网上获取。 如果需要帮助请email联系:licence3519@126.com

    kafka集群搭建.pdf

    kafka集群搭建方案 kafka集群搭建是大数据处理和实时数据处理的重要组件。下面是kafka集群搭建的详细方案: 一、准备工作 1. 关闭防火墙 关闭防火墙是kafka集群搭建的前提条件。可以使用systemctl disable ...

    kafka集群搭建与使用

    Kafka 集群搭建与使用 Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。Kafka 集群的搭建和使用是基于 Kafka 的设计理念和架构...

    kafka集群搭建和使用Java写kafka生产者消费者

    5. **启动服务**: 先启动ZooKeeper,然后启动Kafka的每个节点。 6. **创建主题**:使用Kafka命令行工具`bin/kafka-topics.sh`创建主题,例如`kafka-create-topic.sh --topic my-topic --partitions 3 --replication...

    Kafka集群搭建(3台机)

    搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...

    kafka搭建与使用.doc

    Kafka 集群搭建与使用详解 Kafka 是一种分布式流媒体平台,由 Apache 开源项目提供。它主要用来构建实时数据管道和流媒体处理系统。本文档将详细介绍 Kafka 集群的搭建和使用,包括创建、删除、生产者、消费者等...

    kafka搭建资料及相关组件

    3. **启动Kafka**:在配置完成后,通过命令行启动Zookeeper(Kafka依赖的协调服务)和Kafka服务器。 4. **创建主题**:使用Kafka的管理工具`kafka-topics.sh`创建主题,指定主题名、分区数和副本数。 5. **生产者...

    kafka环境搭建并和spring整合

    ### Kafka环境搭建与Spring整合详解 #### 一、Kafka基本概念 Kafka是一款开源的分布式消息系统,它能够提供高吞吐量的数据管道和存储服务。为了更好地理解和使用Kafka,我们首先需要了解以下几个核心概念: 1. **...

    kafka集群搭建

    kafka分布式集群多服务器和单机部署,需安装zookeeper环境,

    搭建kafka集群详细教程

    总之,搭建Kafka集群涉及多个步骤,包括Zookeeper集群的配置、Kafka的安装与配置、主题创建以及服务启动。正确配置和管理Kafka集群对于实现高效、稳定的数据流处理至关重要。随着对Kafka的深入理解和实践,你可以...

    KAFKA集群搭建参考方案

    【Kafka集群搭建】Kafka是一款高吞吐量的分布式发布订阅消息系统,广泛应用于大数据实时处理和流计算领域。搭建Kafka集群是构建可靠、高效的数据传输平台的关键步骤。根据提供的信息,Kafka集群搭建有两种方式:在多...

    Kafka集群搭建1

    【Kafka集群搭建详解】 Apache Kafka是一个分布式流处理平台,常用于实时数据处理和消息传递。本教程将详细介绍如何在CentOS 6.5环境下搭建Kafka集群,使用的版本为kafka_2.10-0.10.0.0,依赖JDK 1.8.0_172。集群将...

    12-kafka环境搭建1

    【Kafka环境搭建详解】 Kafka是一款分布式流处理平台,常用于实时数据处理和消息传递。本文将详细介绍如何搭建一个Kafka集群,包括基础的安装、配置和启动步骤。 **一、Kafka集群搭建** 1. **安装与解压** - ...

    kafka分布式集群搭建

    ### Kafka分布式集群搭建详解 #### 一、概述 Kafka是一种高性能、分布式的消息发布与订阅系统,被广泛应用于日志收集、流处理、消息传递等多个领域。为了提高系统的可用性与扩展性,通常会采用分布式集群的方式...

Global site tag (gtag.js) - Google Analytics