`

(转)Kafka部署与代码实例

 
阅读更多

转 http://shift-alt-ctrl.iteye.com/blog/1930791

 

 

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

Php代码  收藏代码
  1. clientPort=2181  
  2. server.0=127.0.0.1:2888:3888  
  3. server.1=127.0.0.1:2889:3889  
  4. server.2=127.0.0.1:2890:3890  
  5. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

Java代码  收藏代码
  1. ./zkServer.sh start  

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2182  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2183  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

Java代码  收藏代码
  1. broker.id=0  
  2. port=9092  
  3. num.network.threads=2  
  4. num.io.threads=2  
  5. socket.send.buffer.bytes=1048576  
  6. socket.receive.buffer.bytes=1048576  
  7. socket.request.max.bytes=104857600  
  8. log.dir=./logs  
  9. num.partitions=2  
  10. log.flush.interval.messages=10000  
  11. log.flush.interval.ms=1000  
  12. log.retention.hours=168  
  13. #log.retention.bytes=1073741824  
  14. log.segment.bytes=536870912  
  15. ##replication机制,让每个topic的partitions在kafka-cluster中备份2个  
  16. ##用来提高cluster的容错能力..  
  17. default.replication.factor=1  
  18. log.cleanup.interval.mins=10  
  19. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  20. zookeeper.connection.timeout.ms=1000000  

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

Java代码  收藏代码
  1. > cd kafka-0  
  2. > ./sbt update  
  3. > ./sbt package  
  4. > ./sbt assembly-package-dependency   

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

Java代码  收藏代码
  1. > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

Java代码  收藏代码
  1. broker.id=1  
  2. port=9093  
  3. ##其他配置和kafka-0保持一致  

    然后和kafka-0一样执行打包命令,然后启动此broker.

Java代码  收藏代码
  1. > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

    仍然可以通过如下指令查看topic的"partition"/"replicas"的分布和存活情况.

Java代码  收藏代码
  1. > bin/kafka-list-topic.sh --zookeeper localhost:2181  
  2. topic: my-replicated-topic  partition: 0    leader: 2   replicas: 1,2,0 isr: 2  
  3. topic: test partition: 0    leader: 0   replicas: 0 isr: 0   

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。[配置参数详解]

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

Java代码  收藏代码
  1. <dependencies>  
  2.     <dependency>  
  3.         <groupId>log4j</groupId>  
  4.         <artifactId>log4j</artifactId>  
  5.         <version>1.2.14</version>  
  6.     </dependency>  
  7.     <dependency>  
  8.         <groupId>org.apache.kafka</groupId>  
  9.         <artifactId>kafka_2.8.2</artifactId>  
  10.         <version>0.8.0</version>  
  11.         <exclusions>  
  12.             <exclusion>  
  13.                 <groupId>log4j</groupId>  
  14.                 <artifactId>log4j</artifactId>  
  15.             </exclusion>  
  16.         </exclusions>  
  17.     </dependency>  
  18.     <dependency>  
  19.         <groupId>org.scala-lang</groupId>  
  20.         <artifactId>scala-library</artifactId>  
  21.         <version>2.8.2</version>  
  22.     </dependency>  
  23.     <dependency>  
  24.         <groupId>com.yammer.metrics</groupId>  
  25.         <artifactId>metrics-core</artifactId>  
  26.         <version>2.2.0</version>  
  27.     </dependency>  
  28.     <dependency>  
  29.         <groupId>com.101tec</groupId>  
  30.         <artifactId>zkclient</artifactId>  
  31.         <version>0.3</version>  
  32.     </dependency>  
  33. </dependencies>  

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

Java代码  收藏代码
  1. #partitioner.class=  
  2. ##broker列表可以为kafka server的子集,因为producer需要从broker中获取metadata  
  3. ##尽管每个broker都可以提供metadata,此处还是建议,将所有broker都列举出来  
  4. ##此值,我们可以在spring中注入过来  
  5. ##metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
  6. ##,127.0.0.1:9093  
  7. ##同步,建议为async  
  8. producer.type=sync  
  9. compression.codec=0  
  10. serializer.class=kafka.serializer.StringEncoder  
  11. ##在producer.type=async时有效  
  12. #batch.num.messages=100  

    2) KafkaProducerClient.java代码样例

Java代码  收藏代码
  1. import java.util.ArrayList;  
  2. import java.util.Collection;  
  3. import java.util.List;  
  4. import java.util.Properties;  
  5.   
  6. import kafka.javaapi.producer.Producer;  
  7. import kafka.producer.KeyedMessage;  
  8. import kafka.producer.ProducerConfig;  
  9.   
  10. /** 
  11.  * User: guanqing-liu 
  12.  */  
  13. public class KafkaProducerClient {  
  14.   
  15.     private Producer<String, String> inner;  
  16.       
  17.     private String brokerList;//for metadata discovery,spring setter  
  18.     private String location = "kafka-producer.properties";//spring setter  
  19.       
  20.     private String defaultTopic;//spring setter  
  21.   
  22.     public void setBrokerList(String brokerList) {  
  23.         this.brokerList = brokerList;  
  24.     }  
  25.   
  26.     public void setLocation(String location) {  
  27.         this.location = location;  
  28.     }  
  29.   
  30.     public void setDefaultTopic(String defaultTopic) {  
  31.         this.defaultTopic = defaultTopic;  
  32.     }  
  33.   
  34.     public KafkaProducerClient(){}  
  35.       
  36.     public void init() throws Exception {  
  37.         Properties properties = new Properties();  
  38.         properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
  39.           
  40.           
  41.         if(brokerList != null) {  
  42.             properties.put("metadata.broker.list", brokerList);  
  43.         }  
  44.   
  45.         ProducerConfig config = new ProducerConfig(properties);  
  46.         inner = new Producer<String, String>(config);  
  47.     }  
  48.   
  49.     public void send(String message){  
  50.         send(defaultTopic,message);  
  51.     }  
  52.       
  53.     public void send(Collection<String> messages){  
  54.         send(defaultTopic,messages);  
  55.     }  
  56.       
  57.     public void send(String topicName, String message) {  
  58.         if (topicName == null || message == null) {  
  59.             return;  
  60.         }  
  61.         KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
  62.         inner.send(km);  
  63.     }  
  64.   
  65.     public void send(String topicName, Collection<String> messages) {  
  66.         if (topicName == null || messages == null) {  
  67.             return;  
  68.         }  
  69.         if (messages.isEmpty()) {  
  70.             return;  
  71.         }  
  72.         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
  73.         int i= 0;  
  74.         for (String entry : messages) {  
  75.             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
  76.             kms.add(km);  
  77.             i++;  
  78.             if(i % 20 == 0){  
  79.                 inner.send(kms);  
  80.                 kms.clear();  
  81.             }  
  82.         }  
  83.           
  84.         if(!kms.isEmpty()){  
  85.             inner.send(kms);  
  86.         }  
  87.     }  
  88.   
  89.     public void close() {  
  90.         inner.close();  
  91.     }  
  92.   
  93.     /** 
  94.      * @param args 
  95.      */  
  96.     public static void main(String[] args) {  
  97.         KafkaProducerClient producer = null;  
  98.         try {  
  99.             producer = new KafkaProducerClient();  
  100.             //producer.setBrokerList("");  
  101.             int i = 0;  
  102.             while (true) {  
  103.                 producer.send("test-topic""this is a sample" + i);  
  104.                 i++;  
  105.                 Thread.sleep(2000);  
  106.             }  
  107.         } catch (Exception e) {  
  108.             e.printStackTrace();  
  109.         } finally {  
  110.             if (producer != null) {  
  111.                 producer.close();  
  112.             }  
  113.         }  
  114.   
  115.     }  
  116.   
  117. }  

    3) spring配置

Java代码  收藏代码
  1. <bean id="kafkaProducerClient" class="com.test.kafka.KafkaProducerClient" init-method="init" destroy-method="close">  
  2.     <property name="zkConnect" value="${zookeeper_cluster}"></property>  
  3.     <property name="defaultTopic" value="${kafka_topic}"></property>  
  4. </bean>  

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

Java代码  收藏代码
  1. ## 此值可以配置,也可以通过spring注入  
  2. ##zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  3. ##,127.0.0.1:2182,127.0.0.1:2183  
  4. # timeout in ms for connecting to zookeeper  
  5. zookeeper.connectiontimeout.ms=1000000  
  6. #consumer group id  
  7. group.id=test-group  
  8. #consumer timeout  
  9. #consumer.timeout.ms=5000  
  10. auto.commit.enable=true  
  11. auto.commit.interval.ms=60000  

    2) KafkaConsumerClient.java代码样例

Java代码  收藏代码
  1. package com.test.kafka;  
  2. import java.nio.ByteBuffer;  
  3. import java.nio.CharBuffer;  
  4. import java.nio.charset.Charset;  
  5. import java.util.HashMap;  
  6. import java.util.List;  
  7. import java.util.Map;  
  8. import java.util.Properties;  
  9. import java.util.concurrent.ExecutorService;  
  10. import java.util.concurrent.Executors;  
  11.   
  12. import kafka.consumer.Consumer;  
  13. import kafka.consumer.ConsumerConfig;  
  14. import kafka.consumer.ConsumerIterator;  
  15. import kafka.consumer.KafkaStream;  
  16. import kafka.javaapi.consumer.ConsumerConnector;  
  17. import kafka.message.Message;  
  18. import kafka.message.MessageAndMetadata;  
  19.   
  20. /** 
  21.  * User: guanqing-liu  
  22.  */  
  23. public class KafkaConsumerClient {  
  24.   
  25.     private String groupid; //can be setting by spring  
  26.     private String zkConnect;//can be setting by spring  
  27.     private String location = "kafka-consumer.properties";//配置文件位置  
  28.     private String topic;  
  29.     private int partitionsNum = 1;  
  30.     private MessageExecutor executor; //message listener  
  31.     private ExecutorService threadPool;  
  32.       
  33.     private ConsumerConnector connector;  
  34.       
  35.     private Charset charset = Charset.forName("utf8");  
  36.   
  37.     public void setGroupid(String groupid) {  
  38.         this.groupid = groupid;  
  39.     }  
  40.   
  41.     public void setZkConnect(String zkConnect) {  
  42.         this.zkConnect = zkConnect;  
  43.     }  
  44.   
  45.     public void setLocation(String location) {  
  46.         this.location = location;  
  47.     }  
  48.   
  49.     public void setTopic(String topic) {  
  50.         this.topic = topic;  
  51.     }  
  52.   
  53.     public void setPartitionsNum(int partitionsNum) {  
  54.         this.partitionsNum = partitionsNum;  
  55.     }  
  56.   
  57.     public void setExecutor(MessageExecutor executor) {  
  58.         this.executor = executor;  
  59.     }  
  60.   
  61.     public KafkaConsumerClient() {}  
  62.   
  63.     //init consumer,and start connection and listener  
  64.     public void init() throws Exception {  
  65.         if(executor == null){  
  66.             throw new RuntimeException("KafkaConsumer,exectuor cant be null!");  
  67.         }  
  68.         Properties properties = new Properties();  
  69.         properties.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));  
  70.           
  71.         if(groupid != null){  
  72.             properties.put("groupid", groupid);  
  73.         }  
  74.         if(zkConnect != null){  
  75.             properties.put("zookeeper.connect", zkConnect);  
  76.         }  
  77.         ConsumerConfig config = new ConsumerConfig(properties);  
  78.   
  79.         connector = Consumer.createJavaConsumerConnector(config);  
  80.         Map<String, Integer> topics = new HashMap<String, Integer>();  
  81.         topics.put(topic, partitionsNum);  
  82.         Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
  83.         List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
  84.         threadPool = Executors.newFixedThreadPool(partitionsNum * 2);  
  85.           
  86.         //start  
  87.         for (KafkaStream<byte[], byte[]> partition : partitions) {  
  88.             threadPool.execute(new MessageRunner(partition));  
  89.         }  
  90.     }  
  91.   
  92.     public void close() {  
  93.         try {  
  94.             threadPool.shutdownNow();  
  95.         } catch (Exception e) {  
  96.             //  
  97.         } finally {  
  98.             connector.shutdown();  
  99.         }  
  100.   
  101.     }  
  102.   
  103.     class MessageRunner implements Runnable {  
  104.         private KafkaStream<byte[], byte[]> partition;  
  105.   
  106.         MessageRunner(KafkaStream<byte[], byte[]> partition) {  
  107.             this.partition = partition;  
  108.         }  
  109.   
  110.         public void run() {  
  111.             ConsumerIterator<byte[], byte[]> it = partition.iterator();  
  112.             while (it.hasNext()) {  
  113.                 // connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
  114.                 MessageAndMetadata<byte[], byte[]> item = it.next();  
  115.                 try{  
  116.                     executor.execute(new String(item.message(),charset));// UTF-8,注意异常  
  117.                 }catch(Exception e){  
  118.                     //  
  119.                 }  
  120.             }  
  121.         }  
  122.           
  123.         public String getContent(Message message){  
  124.             ByteBuffer buffer = message.payload();  
  125.             if (buffer.remaining() == 0) {  
  126.                 return null;  
  127.             }  
  128.             CharBuffer charBuffer = charset.decode(buffer);  
  129.             return charBuffer.toString();  
  130.         }  
  131.     }  
  132.   
  133.     public static interface MessageExecutor {  
  134.   
  135.         public void execute(String message);  
  136.     }  
  137.   
  138.     /** 
  139.      * @param args 
  140.      */  
  141.     public static void main(String[] args) {  
  142.         KafkaConsumerClient consumer = null;  
  143.         try {  
  144.             MessageExecutor executor = new MessageExecutor() {  
  145.   
  146.                 public void execute(String message) {  
  147.                     System.out.println(message);  
  148.                 }  
  149.             };  
  150.             consumer = new KafkaConsumerClient();  
  151.               
  152.             consumer.setTopic("test-topic");  
  153.             consumer.setPartitionsNum(2);  
  154.             consumer.setExecutor(executor);  
  155.             consumer.init();  
  156.         } catch (Exception e) {  
  157.             e.printStackTrace();  
  158.         } finally {  
  159.              if(consumer != null){  
  160.                  consumer.close();  
  161.              }  
  162.         }  
  163.   
  164.     }  
  165.   
  166. }  

    3) spring配置(略)

 

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

分享到:
评论

相关推荐

    jstorm集成kafka代码实例

    总的来说,"jstorm集成kafka代码实例"是一个实践性的教程,帮助开发者了解如何在Java环境中将JStorm和Kafka结合,构建实时数据处理系统。通过这个例子,你不仅可以学习到如何编写相关代码,还能理解两个组件协同工作...

    kafka学习实例

    **Kafka学习实例** 在大数据实时处理领域,Apache Kafka是一个不可或缺的组件,它是一个分布式流处理平台,用于构建实时数据管道和流应用。本实例主要围绕Kafka的集群配置、生产和消费实例进行深入探讨。 首先,...

    storm+kafka+jdbc整合实例

    6. **测试与监控**:部署并启动 Storm 拓扑后,你需要监控系统的运行状态,确保数据正确地从 Kafka 流入 Storm,并最终通过 JDBC 存储到数据库中。可以使用 Storm UI 或其他监控工具进行监控。 在“StormKafkaJdbc...

    kafka 可运行实例

    6. **Kafka Connect**:这是一个用于集成Kafka与其他系统的工具,可以方便地导入和导出数据,比如从数据库到Kafka,或者从Kafka到数据仓库。 7. **Zookeeper**:Kafka依赖Zookeeper进行集群协调,包括管理broker、...

    Kafka实例Kafka实例

    在IT行业中,Kafka是一个非常重要的分布式流处理平台,由LinkedIn开发并贡献给了Apache软件基金会。Kafka实例是实现大规模实时数据处理...通过实践Kafka-Demo,你将更深入地了解如何在实际环境中部署和操作Kafka实例。

    Kafka .Net Framework4.0 版本

    使用Kafka .Net库,你可以创建生产者和消费者实例,它们是与Kafka集群交互的基本单元。生产者负责生成消息并将其发布到特定的主题,而消费者则订阅这些主题并处理接收到的消息。Kafka .Net支持同步和异步消息发送,...

    kafka 实例

    这个"Kafka实例"的压缩包文件很可能是提供了一个实际的开发环境,包括消费者和生产者的Java代码示例,适用于在IntelliJ IDEA这样的集成开发环境中运行的Maven项目。下面将详细讨论Kafka、消费者、生产者以及与Maven...

    kafka实例 消费者生产者

    生产者通过API与Kafka集群交互,选择消息的存储位置,并确保数据的顺序。 **消费者**则是从Kafka主题中读取并处理数据的应用程序。Kafka支持多消费者组,每个组内的消费者可以并行消费消息,实现负载均衡。消费者...

    kafka环境部署安装包

    - 集群部署时,需要配置多个Kafka和Zookeeper实例,并在配置文件中正确设置集群间的连接信息。 通过以上步骤,你可以在本地环境中成功部署Kafka和Zookeeper,为你的大数据实时处理需求打下基础。记得根据实际情况...

    kafka模拟生产者消费者(集群模式)实例

    集群模式是Kafka的一种部署方式,它可以提供冗余和负载均衡。在集群中,多个Kafka broker(服务器)共同分发和存储消息,确保服务的高可用性和容错性。 要实现集群模式的Kafka生产者和消费者,你需要以下步骤: 1....

    kafka学习笔记.doc

    10. 代码实践《kafka学习代码》中可能包含了一些示例,涵盖了producer、consumer的创建、消息发送与接收、配置调整等内容,是深入理解Kafka工作原理和使用技巧的重要参考资料。 总结,Kafka作为一个分布式流处理...

    Kafka Streams1.zip

    - **测试与部署**:单元测试代码,然后将应用部署到生产环境。 5. **Kafka Streams的应用场景**: - **实时数据分析**:实时监控系统性能、用户行为分析等。 - **ETL流程**:从不同的数据源抽取数据,转换后加载...

    kafka_2.11-2.0.0.tgz

    - **生产者示例**:使用Kafka的Java API,编写生产者代码,通过new Producer()创建实例,然后使用send()方法发送消息到指定主题。 - **消费者示例**:创建消费者实例,通过subscribe()订阅主题,然后使用poll()...

    Kafka学习资料.pdf

    代码实例在Kafka学习资料中可能会展示如何配置Producer和Consumer,如何创建Topic和Partition,如何设置和管理消费者组,以及如何通过Kafka事务来保证消息的原子性。这些代码示例将会是理解Kafka实际操作的基础。 ...

    阿里云消息队列kafka demo

    Scala代码中,开发者将实现一个类,该类连接到Kafka集群,创建Producer实例,并发送消息到指定的Topic。这通常涉及配置Producer的属性,如bootstrap服务器列表、序列化方式等。 `Consumer`则是接收和处理消息的一端...

    springboot集成kafka进行消息发布和订阅jar

    **SpringBoot集成Kafka进行消息发布与订阅** 在现代微服务架构中,消息队列(Message Queue)扮演着至关重要的角色,它能够有效地解耦系统组件,提高系统的响应速度和可扩展性。Apache Kafka作为一款高吞吐量、低...

    storm集成kafka插demo.zip

    【标题】"storm集成kafka插demo.zip"指的是一个演示如何将Apache Storm与Apache Kafka集成的实例项目。这个压缩包包含了一个示例,用于展示如何在Storm拓扑中消费和处理Kafka的消息。 【描述】"storm集成kafka插件...

    spring-kafka

    这里提到的 "zk" 可能是指配置 Kafka 时需要的 Zookeeper 实例,虽然现在大多数新部署的 Kafka 集群使用内置的元数据管理。 **2. Spring Kafka 的配置** 配置 Spring Kafka 主要涉及以下几个方面: - **Bootstrap...

    kafka安装相关文件以及java调用kafka示例项目

    Kafka的部署需要一个Zookeeper集群,因为Zookeeper是Kafka的核心依赖,用于协调集群中的各个节点。因此,提供的zookeeper安装包也是必不可少的。Zookeeper是一个分布式的、开放源码的协调服务,它为分布式应用提供了...

    win32 C++ kafka 库

    2. 创建Kafka生产者实例,配置参数如服务器地址、主题等: ```cpp KafkaProducer producer("localhost:9092"); ``` 3. 创建消息,设置消息属性: ```cpp KafkaMessage message("my-topic", "Hello, Kafka!"); ...

Global site tag (gtag.js) - Google Analytics