事先安装好zookeeper
1.下载kafka:http://kafka.apache.org/downloads.html
这里我下载的是kafka_2.11-0.11.0.1.tgz
2.解压
tar -xzf kafka_2.11-0.11.0.1.tgz
解压后的目录结构
3.修改配置config/server.properties
主要修改:
broker.id=1
port=9092
host.name=broker的主机地址
#zookeeper主机地址和端口
zookeeper.connect=ip1:port1,ip2:port2,ip3:port3
详细参数说明参照:http://blog.csdn.net/lizhitao/article/details/25667831
4.启动kafka
bin/kafka-server-start.sh config/server.properties &
5.发送消息测试
启动producer
bin/kafka-console-producer.sh --broker-list localost:9092 --topic test
注意:此处localhost为本机IP,否则报错
启动后随便输入消息内容
>kafka消息发送测试
打开另外窗口,启动consumer
bin/kafka-console-consumer.sh --zookeeper localhost:port --topic test --from-beginning
注意:同样localhost和port是zookeeper服务IP和端口,有多个就用逗号“,”隔开
6.配置集群
在这里配置的是伪集群
拷贝配置文件
cp config/server.properties config/server-2.properties
修改参数:
broker.id=2
port=9093
host.name=broker的主机地址
7.启动新节点
bin/kafka-server-start.sh config/server-2.properties &
8.Java开发使用
1)引用相关jar包
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1.1</version> <exclusions> <!-- 实际应用中单独引入下面的jar包,不使用kafka带的 --> <exclusion> <artifactId>zookeeper</artifactId> <groupId>org.apache.zookeeper</groupId> </exclusion> <exclusion> <artifactId>zkclient</artifactId> <groupId>com.101tec</groupId> </exclusion> <exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId> </exclusion> </exclusions> </dependency> <!-- Zookeeper客户端 --> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.4</version> <exclusions> <exclusion> <artifactId>log4j</artifactId> <groupId>log4j</groupId> </exclusion> </exclusions> </dependency>
具体代码参照:http://www.cnblogs.com/lilixin/p/5775877.html
这里给出关键配置:
kafka.properties
zookeeper.connect=192.168.1.190:2181,192.168.1.190:2182,192.168.1.190:2183 #zookeeper.connect=zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181 metadata.broker.list=192.168.1.190:9092,192.168.1.190:9093 #metadata.broker.list=kafka.server1.vko.cn:9092,kafka.server2.vko.cn:9092 #zookeeper.connect.timeout=15000 #zookeeper.session.timeout.ms=15000 #zookeeper.sync.time.ms=20000 #auto.commit.interval.ms=20000 #auto.offset.reset=smallest #serializer.class=kafka.serializer.StringEncoder #producer.type=async #queue.buffering.max.ms=6000 test.group.id=huhui kafka.test.topics=huhui
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:mybatis="http://mybatis.org/schema/mybatis-spring" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd" default-autowire="byName"> <!-- 这个是加载给spring 用的. --> <bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean> <!-- 这个是用来在代码中注入用的. --> <bean id="configProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>classpath:kafka.properties</value> </list> </property> </bean> <!-- kafka --> <import resource="applicationContext-kafka-producer.xml"/> <import resource="applicationContext-kafka-receiver.xml"/> </beans>
applicationContext-kafka-producer.xml
<bean id="topProducer" class="top.lilixin.TopProducer"> <constructor-arg index="0" value="${metadata.broker.list}" /> </bean>
applicationContext-kafka-receiver.xml
<!-- 定义消息处理器 --> <bean id="testConsumer" class="top.lilixin.TestConsumer"></bean> <!-- 定义收信人 receiver --> <bean id="topReceiver" class="top.lilixin.TopReceiver"> <constructor-arg index="0" value="${zookeeper.connect}" /><!-- _zookeeper集群地址,如: zkserver1.vko.cn:2181,zkserver2.vko.cn:2181,zkserver3.vko.cn:2181_ --> <constructor-arg index="1" value="${test.group.id}" /><!-- _消费者所属组id字符串 ,如:vko_group_article_read_count_ --> <constructor-arg index="2" value="${kafka.test.topics}" /><!-- _要消费的消息主题,如:vko_group_ --> <constructor-arg index="3" ref="testConsumer" /> <!--_上面定义的消息处理器_ --> </bean>
项目原代码中TopReceiver.java有点小问题:服务关闭后,重启服务会重复读取消息。
原代码:
// 目前每个topic都是2个分区 topicCountMap.put(topic,2); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); for (final KafkaStream<byte[], byte[]> stream : streams) { new Thread(){ public void run(){ ConsumerIterator<byte[], byte[]> it = stream.iterator(); while(it.hasNext()){ String msg = new String(it.next().message()); try{ topConsumer.dealMsg(msg); }catch(Exception e){ log.error("kafka vkoConsumer topic:{} 收到消息:{} 消费异常 xxxxxxxxxxxxxxxxxx", topic, msg,e); } log.info("kafka vkoConsumer topic:{} 收到消息:{}", topic, msg); } } }.start(); log.info("kafka vkoConsumer 启动完成:groupId:{},topic:{},zookeeperConnect:{}",groupId, topic, zookeeperConnect); }
首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。
Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。
使用High Level Consumer,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:
- 如果线程数量大于主题的分区数量,一些线程将得不到任何消息
- 如果分区数大于线程数,一些线程将得到多个分区的消息
- 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。
- 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。
Kafka不会再每次读取消息后马上更新zookeeper上的offset,而是等待一段时间。由于这种延迟,有可能消费者读取了一条消息,但没有更新offset。所以,当客户端关闭或崩溃后,从新启动时有些消息重复读取了。另外,broker宕机或其他原因导致更换了partition的leader,也会导致消息重复读取。
为了避免这种问题,你应该提供一个平滑的关闭方式,而不是使用kill -9
修改后:
private static final int THREAD_AMOUNT = 2; …… // 目前每个topic都是2个分区 topicCountMap.put(topic,THREAD_AMOUNT); Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = cc.createMessageStreams(topicCountMap); List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); ExecutorService executor = Executors.newFixedThreadPool(THREAD_AMOUNT); //使用ExecutorService来调度线程 for (int i = 0; i < streams.size(); i++) { KafkaStream<byte[], byte[]> kafkaStream = streams.get(i); executor.submit(new HanldMessageThread(kafkaStream, i)); log.info("kafka vkoConsumer 启动完成:groupId:"+groupId+",topic:"+topic+",zookeeperConnect:"+zookeeperConnect); } //关闭consumer try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } if (cc != null) { cc.shutdown(); } if (executor != null) { executor.shutdown(); } try { if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) { log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly"); } } catch (InterruptedException e) { log.info("Interrupted during shutdown, exiting uncleanly"); } …… /** * 具体处理message的线程 * @author Administrator * */ class HanldMessageThread implements Runnable { private KafkaStream<byte[], byte[]> kafkaStream = null; private int num = 0; public HanldMessageThread(KafkaStream<byte[], byte[]> kafkaStream, int num) { super(); this.kafkaStream = kafkaStream; this.num = num; } public void run() { ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator(); while(iterator.hasNext()) { String message = new String(iterator.next().message()); System.out.println("Thread no: " + num + ", message: " + message); } } }
具体参照:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
补充:非常不错的Kafka教程
http://orchome.com/kafka/index
相关推荐
Kafka 环境搭建 Kafka 是一个distributed ...搭建 Kafka 环境需要安装 Java 环境、Python 环境、Zookeeper 和 Kafka 服务,并配置相关的环境变量和配置文件。通过这些步骤,可以成功搭建一个单机的 Kafka 环境。
liunx系统下,组件服务搭建,安装必备.Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据
本篇文档将详细介绍如何在Linux环境中搭建Kafka集群,同时结合Hadoop和Spark的分布式安装,构建一个完整的数据处理平台。 首先,搭建Kafka集群的基础条件包括: 1. 至少一台Linux服务器,但为了高可用性,推荐多台...
【Kafka集群搭建及测试】 Kafka是一种分布式流处理平台,常用于实时数据处理和大数据管道。本文档将详细介绍如何在三台Ubuntu 16虚拟机上搭建Kafka集群,并进行基本的测试,确保其正常运行。 **1. 准备工作** 在...
有时需要将kafka集成到vc环境中,应用于某种场景。 本资源是将kafka cpp库封装成生产者,...运行需要依赖搭建好的kafka服务器 搭建kafka服务器的文章可以从网上获取。 如果需要帮助请email联系:licence3519@126.com
kafka集群搭建方案 kafka集群搭建是大数据处理和实时数据处理的重要组件。下面是kafka集群搭建的详细方案: 一、准备工作 1. 关闭防火墙 关闭防火墙是kafka集群搭建的前提条件。可以使用systemctl disable ...
Kafka 集群搭建与使用 Kafka 是一种高吞吐量的分布式发布订阅消息系统,使用 Scala 编写。Kafka 拥有作为一个消息系统应该具备的功能,但是确有着独特的设计。Kafka 集群的搭建和使用是基于 Kafka 的设计理念和架构...
5. **启动服务**: 先启动ZooKeeper,然后启动Kafka的每个节点。 6. **创建主题**:使用Kafka命令行工具`bin/kafka-topics.sh`创建主题,例如`kafka-create-topic.sh --topic my-topic --partitions 3 --replication...
搭建Kafka集群涉及到对虚拟机的安装配置、JDK环境的搭建、Zookeeper的安装配置等关键步骤。下面详细介绍各个知识点。 首先,虚拟机的安装是搭建Kafka集群的基础。文中提到了使用VMWare来安装三台虚拟机,并分配了...
Kafka 集群搭建与使用详解 Kafka 是一种分布式流媒体平台,由 Apache 开源项目提供。它主要用来构建实时数据管道和流媒体处理系统。本文档将详细介绍 Kafka 集群的搭建和使用,包括创建、删除、生产者、消费者等...
3. **启动Kafka**:在配置完成后,通过命令行启动Zookeeper(Kafka依赖的协调服务)和Kafka服务器。 4. **创建主题**:使用Kafka的管理工具`kafka-topics.sh`创建主题,指定主题名、分区数和副本数。 5. **生产者...
### Kafka环境搭建与Spring整合详解 #### 一、Kafka基本概念 Kafka是一款开源的分布式消息系统,它能够提供高吞吐量的数据管道和存储服务。为了更好地理解和使用Kafka,我们首先需要了解以下几个核心概念: 1. **...
kafka分布式集群多服务器和单机部署,需安装zookeeper环境,
【Kafka集群搭建】Kafka是一款高吞吐量的分布式发布订阅消息系统,广泛应用于大数据实时处理和流计算领域。搭建Kafka集群是构建可靠、高效的数据传输平台的关键步骤。根据提供的信息,Kafka集群搭建有两种方式:在多...
【Kafka集群搭建详解】 Apache Kafka是一个分布式流处理平台,常用于实时数据处理和消息传递。本教程将详细介绍如何在CentOS 6.5环境下搭建Kafka集群,使用的版本为kafka_2.10-0.10.0.0,依赖JDK 1.8.0_172。集群将...
【Kafka环境搭建详解】 Kafka是一款分布式流处理平台,常用于实时数据处理和消息传递。本文将详细介绍如何搭建一个Kafka集群,包括基础的安装、配置和启动步骤。 **一、Kafka集群搭建** 1. **安装与解压** - ...
### Kafka分布式集群搭建详解 #### 一、概述 Kafka是一种高性能、分布式的消息发布与订阅系统,被广泛应用于日志收集、流处理、消息传递等多个领域。为了提高系统的可用性与扩展性,通常会采用分布式集群的方式...
【Kafka环境搭建】 Kafka是一款分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。它被设计用来处理大规模实时数据流,提供高吞吐量的发布订阅消息系统,同时具备容错性和可扩展性。 **单机环境搭建...