`
ssxxjjii
  • 浏览: 950074 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

消息系统kafka介绍

 
阅读更多

http://dongxicheng.org/search-engine/kafka/

http://shift-alt-ctrl.iteye.com/blog/1930791

kafka作为分布式日志收集或系统监控服务,我们有必要在合适的场合使用它。kafka的部署包括zookeeper环境/kafka环境,同时还需要进行一些配置操作.接下来介绍如何使用kafka.

    我们使用3个zookeeper实例构建zk集群,使用2个kafka broker构建kafka集群.

    其中kafka为0.8V,zookeeper为3.4.5V

 

一.Zookeeper集群构建

    我们有3个zk实例,分别为zk-0,zk-1,zk-2;如果你仅仅是测试使用,可以使用1个zk实例.

    1) zk-0

    调整配置文件:

Php代码  收藏代码
  1. clientPort=2181  
  2. server.0=127.0.0.1:2888:3888  
  3. server.1=127.0.0.1:2889:3889  
  4. server.2=127.0.0.1:2890:3890  
  5. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

Java代码  收藏代码
  1. ./zkServer.sh start  

    2) zk-1

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2182  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

    3) zk-2

    调整配置文件(其他配置和zk-0一只):

Php代码  收藏代码
  1. clientPort=2183  
  2. ##只需要修改上述配置,其他配置保留默认值  

    启动zookeeper

 

Java代码  收藏代码
  1. ./zkServer.sh start  

  

二. Kafka集群构建

    因为Broker配置文件涉及到zookeeper的相关约定,因此我们先展示broker配置文件.我们使用2个kafka broker来构建这个集群环境,分别为kafka-0,kafka-1.

    1) kafka-0

    在config目录下修改配置文件为:

Java代码  收藏代码
  1. broker.id=0  
  2. port=9092  
  3. num.network.threads=2  
  4. num.io.threads=2  
  5. socket.send.buffer.bytes=1048576  
  6. socket.receive.buffer.bytes=1048576  
  7. socket.request.max.bytes=104857600  
  8. log.dir=./logs  
  9. num.partitions=2  
  10. log.flush.interval.messages=10000  
  11. log.flush.interval.ms=1000  
  12. log.retention.hours=168  
  13. #log.retention.bytes=1073741824  
  14. log.segment.bytes=536870912  
  15. num.replica.fetchers=2  
  16. log.cleanup.interval.mins=10  
  17. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  18. zookeeper.connection.timeout.ms=1000000  
  19. kafka.metrics.polling.interval.secs=5  
  20. kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  
  21. kafka.csv.metrics.dir=/tmp/kafka_metrics  
  22. kafka.csv.metrics.reporter.enabled=false  

    因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

Java代码  收藏代码
  1. > cd kafka-0  
  2. > ./sbt update  
  3. > ./sbt package  
  4. > ./sbt assembly-package-dependency   

    其中最后一条指令执行有可能出现异常,暂且不管。 启动kafka broker:

Java代码  收藏代码
  1. > JMS_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    因为zookeeper环境已经正常运行了,我们无需通过kafka来挂载启动zookeeper.如果你的一台机器上部署了多个kafka broker,你需要声明JMS_PORT.

    2) kafka-1

Java代码  收藏代码
  1. broker.id=1  
  2. port=9093  
  3. ##其他配置和kafka-0保持一致  

    然后和kafka-0一样执行打包命令,然后启动此broker.

Java代码  收藏代码
  1. > JMS_PORT=9998 bin/kafka-server-start.sh config/server.properties &  

    到目前为止环境已经OK了,那我们就开始展示编程实例吧。

 

三.项目准备

    项目基于maven构建,不得不说kafka java客户端实在是太糟糕了;构建环境会遇到很多麻烦。建议参考如下pom.xml;其中各个依赖包必须版本协调一致。如果kafka client的版本和kafka server的版本不一致,将会有很多异常,比如"broker id not exists"等;因为kafka从0.7升级到0.8之后(正名为2.8.0),client与server通讯的protocol已经改变.

Java代码  收藏代码
  1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
  2.          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  
  3.     <modelVersion>4.0.0</modelVersion>  
  4.     <groupId>com.test</groupId>  
  5.     <artifactId>test-kafka</artifactId>  
  6.     <packaging>jar</packaging>  
  7.   
  8.     <name>test-kafka</name>  
  9.     <url>http://maven.apache.org</url>  
  10.     <version>1.0.0</version>  
  11.     <dependencies>  
  12.         <dependency>  
  13.             <groupId>log4j</groupId>  
  14.             <artifactId>log4j</artifactId>  
  15.             <version>1.2.14</version>  
  16.         </dependency>  
  17.         <dependency>  
  18.             <groupId>org.apache.kafka</groupId>  
  19.             <artifactId>kafka_2.8.0</artifactId>  
  20.             <version>0.8.0-beta1</version>  
  21.             <exclusions>  
  22.                 <exclusion>  
  23.                     <groupId>log4j</groupId>  
  24.                     <artifactId>log4j</artifactId>  
  25.                 </exclusion>  
  26.             </exclusions>  
  27.         </dependency>  
  28.         <dependency>  
  29.             <groupId>org.scala-lang</groupId>  
  30.             <artifactId>scala-library</artifactId>  
  31.             <version>2.8.1</version>  
  32.         </dependency>  
  33.         <dependency>  
  34.             <groupId>com.yammer.metrics</groupId>  
  35.             <artifactId>metrics-core</artifactId>  
  36.             <version>2.2.0</version>  
  37.         </dependency>  
  38.         <dependency>  
  39.             <groupId>com.101tec</groupId>  
  40.             <artifactId>zkclient</artifactId>  
  41.             <version>0.3</version>  
  42.         </dependency>  
  43.     </dependencies>  
  44.     <build>  
  45.         <finalName>test-kafka-1.0</finalName>  
  46.         <resources>  
  47.             <resource>  
  48.                 <directory>src/main/resources</directory>  
  49.                 <filtering>true</filtering>  
  50.             </resource>  
  51.         </resources>  
  52.         <plugins>  
  53.             <plugin>  
  54.                 <artifactId>maven-compiler-plugin</artifactId>  
  55.                 <version>2.3.2</version>  
  56.                 <configuration>  
  57.                     <source>1.5</source>  
  58.                     <target>1.5</target>  
  59.                     <encoding>gb2312</encoding>  
  60.                 </configuration>  
  61.             </plugin>  
  62.             <plugin>  
  63.                 <artifactId>maven-resources-plugin</artifactId>  
  64.                 <version>2.2</version>  
  65.                 <configuration>  
  66.                     <encoding>gbk</encoding>  
  67.                 </configuration>  
  68.             </plugin>  
  69.         </plugins>  
  70.     </build>  
  71. </project>  

 

四.Producer端代码

    1) producer.properties文件:此文件放在/resources目录下

Java代码  收藏代码
  1. #partitioner.class=  
  2. metadata.broker.list=127.0.0.1:9092,127.0.0.1:9093  
  3. ##,127.0.0.1:9093  
  4. producer.type=sync  
  5. compression.codec=0  
  6. serializer.class=kafka.serializer.StringEncoder  
  7. ##在producer.type=async时有效  
  8. #batch.num.messages=100  

    2) LogProducer.java代码样例

Java代码  收藏代码
  1. package com.test.kafka;  
  2.   
  3. import java.util.ArrayList;  
  4. import java.util.Collection;  
  5. import java.util.List;  
  6. import java.util.Properties;  
  7.   
  8. import kafka.javaapi.producer.Producer;  
  9. import kafka.producer.KeyedMessage;  
  10. import kafka.producer.ProducerConfig;  
  11. public class LogProducer {  
  12.   
  13.     private Producer<String,String> inner;  
  14.     public LogProducer() throws Exception{  
  15.         Properties properties = new Properties();  
  16.         properties.load(ClassLoader.getSystemResourceAsStream("producer.properties"));  
  17.         ProducerConfig config = new ProducerConfig(properties);  
  18.         inner = new Producer<String, String>(config);  
  19.     }  
  20.   
  21.       
  22.     public void send(String topicName,String message) {  
  23.         if(topicName == null || message == null){  
  24.             return;  
  25.         }  
  26.         KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,message);  
  27.         inner.send(km);  
  28.     }  
  29.       
  30.     public void send(String topicName,Collection<String> messages) {  
  31.         if(topicName == null || messages == null){  
  32.             return;  
  33.         }  
  34.         if(messages.isEmpty()){  
  35.             return;  
  36.         }  
  37.         List<KeyedMessage<String, String>> kms = new ArrayList<KeyedMessage<String, String>>();  
  38.         for(String entry : messages){  
  39.             KeyedMessage<String, String> km = new KeyedMessage<String, String>(topicName,entry);  
  40.             kms.add(km);  
  41.         }  
  42.         inner.send(kms);  
  43.     }  
  44.       
  45.     public void close(){  
  46.         inner.close();  
  47.     }  
  48.       
  49.     /** 
  50.      * @param args 
  51.      */  
  52.     public static void main(String[] args) {  
  53.         LogProducer producer = null;  
  54.         try{  
  55.             producer = new LogProducer();  
  56.             int i=0;  
  57.             while(true){  
  58.                 producer.send("test-topic""this is a sample" + i);  
  59.                 i++;  
  60.                 Thread.sleep(2000);  
  61.             }  
  62.         }catch(Exception e){  
  63.             e.printStackTrace();  
  64.         }finally{  
  65.             if(producer != null){  
  66.                 producer.close();  
  67.             }  
  68.         }  
  69.   
  70.     }  
  71.   
  72. }  

 

五.Consumer端

     1) consumer.properties:文件位于/resources目录下

Java代码  收藏代码
  1. zookeeper.connect=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183  
  2. ##,127.0.0.1:2182,127.0.0.1:2183  
  3. # timeout in ms for connecting to zookeeper  
  4. zookeeper.connectiontimeout.ms=1000000  
  5. #consumer group id  
  6. group.id=test-group  
  7. #consumer timeout  
  8. #consumer.timeout.ms=5000  
  9. auto.commit.enable=true  
  10. auto.commit.interval.ms=60000  

    2) LogConsumer.java代码样例

Java代码  收藏代码
  1. package com.test.kafka;  
  2.   
  3. import java.util.HashMap;  
  4. import java.util.List;  
  5. import java.util.Map;  
  6. import java.util.Properties;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9.   
  10. import kafka.consumer.Consumer;  
  11. import kafka.consumer.ConsumerConfig;  
  12. import kafka.consumer.ConsumerIterator;  
  13. import kafka.consumer.KafkaStream;  
  14. import kafka.javaapi.consumer.ConsumerConnector;  
  15. import kafka.message.MessageAndMetadata;  
  16. public class LogConsumer {  
  17.   
  18.     private ConsumerConfig config;  
  19.     private String topic;  
  20.     private int partitionsNum;  
  21.     private MessageExecutor executor;  
  22.     private ConsumerConnector connector;  
  23.     private ExecutorService threadPool;  
  24.     public LogConsumer(String topic,int partitionsNum,MessageExecutor executor) throws Exception{  
  25.         Properties properties = new Properties();  
  26.         properties.load(ClassLoader.getSystemResourceAsStream("consumer.properties"));  
  27.         config = new ConsumerConfig(properties);  
  28.         this.topic = topic;  
  29.         this.partitionsNum = partitionsNum;  
  30.         this.executor = executor;  
  31.     }  
  32.       
  33.     public void start() throws Exception{  
  34.         connector = Consumer.createJavaConsumerConnector(config);  
  35.         Map<String,Integer> topics = new HashMap<String,Integer>();  
  36.         topics.put(topic, partitionsNum);  
  37.         Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topics);  
  38.         List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);  
  39.         threadPool = Executors.newFixedThreadPool(partitionsNum);  
  40.         for(KafkaStream<byte[], byte[]> partition : partitions){  
  41.             threadPool.execute(new MessageRunner(partition));  
  42.         }   
  43.     }  
  44.   
  45.           
  46.     public void close(){  
  47.         try{  
  48.             threadPool.shutdownNow();  
  49.         }catch(Exception e){  
  50.             //  
  51.         }finally{  
  52.             connector.shutdown();  
  53.         }  
  54.           
  55.     }  
  56.       
  57.     class MessageRunner implements Runnable{  
  58.         private KafkaStream<byte[], byte[]> partition;  
  59.           
  60.         MessageRunner(KafkaStream<byte[], byte[]> partition) {  
  61.             this.partition = partition;  
  62.         }  
  63.           
  64.         public void run(){  
  65.             ConsumerIterator<byte[], byte[]> it = partition.iterator();  
  66.             while(it.hasNext()){  
  67.                                 //connector.commitOffsets();手动提交offset,当autocommit.enable=false时使用  
  68.                 MessageAndMetadata<byte[],byte[]> item = it.next();  
  69.                 System.out.println("partiton:" + item.partition());  
  70.                 System.out.println("offset:" + item.offset());  
  71.                 executor.execute(new String(item.message()));//UTF-8,注意异常  
  72.             }  
  73.         }  
  74.     }  
  75.       
  76.     interface MessageExecutor {  
  77.           
  78.         public void execute(String message);  
  79.     }  
  80.       
  81.     /** 
  82.      * @param args 
  83.      */  
  84.     public static void main(String[] args) {  
  85.         LogConsumer consumer = null;  
  86.         try{  
  87.             MessageExecutor executor = new MessageExecutor() {  
  88.                   
  89.                 public void execute(String message) {  
  90.                     System.out.println(message);  
  91.                       
  92.                 }  
  93.             };  
  94.             consumer = new LogConsumer("test-topic"2, executor);  
  95.             consumer.start();  
  96.         }catch(Exception e){  
  97.             e.printStackTrace();  
  98.         }finally{  
  99. //          if(consumer != null){  
  100. //              consumer.close();  
  101. //          }  
  102.         }  
  103.   
  104.     }  
  105.   
  106. }  

    需要提醒的是,上述LogConsumer类中,没有太多的关注异常情况,必须在MessageExecutor.execute()方法中抛出异常时的情况.

    在测试时,建议优先启动consumer,然后再启动producer,这样可以实时的观测到最新的消息。

 

分享到:
评论
3 楼 西巴拉古呀那 2018-02-21  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
网盘地址:https://pan.baidu.com/s/1nwwhpP3 密码: mxu6
网盘地址:https://pan.baidu.com/s/1mjM5HaC 密码: xa5s
2 楼 kafodaote 2018-01-21  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
网盘地址:https://pan.baidu.com/s/1c3JymAk 密码: dnky
网盘地址:https://pan.baidu.com/s/1eTV5ygU 密码: 3g3v
1 楼 成大大的 2017-10-18  
Kafka分布式消息系统实战(与JavaScalaHadoopStorm集成)
——https://pan.baidu.com/s/1cAm9AI 密码: 7fvt

内容简介
Kafka是分布式的消息队列,作为云计算服务的基石,它广泛的应用在实时数据流方面,是实时数据处理的数据中枢,广泛应用在很多互联网企业,例如:linkedin,facebook,腾讯,百度,阿里等。实时数据流是现在互联网公司、甚至拥有大规模数据的传统企业的主要模式, 实时数据(Real-time Activity Data)就是那些非交易,不需要秒级响应的数据, 但在后续的分析中产生极大作用,例如个性化推荐、运营服务监控、精细化营销、报表等 。

本课程的目的在于系统性地介绍Kafka分布式消息系统,掌握了Kafka,你就拿到了大数据处理领域消息处理机制的钥匙,能够轻松上手开发分布式消息系统应用程序开发和维护,笑傲大数据处理技术。学完本课程,你可以掌握:

1.Kafka的部署方式
2.Kafka的原理
3.Kafka与其他大数据组件的集成
4.基于Kafka的程序开发



第一章 Kafka的基本介绍
   1.1 什么是消息系统
   1.2 消息队列的分类
   1.3 Kafka的基本架构和概念
   1.4 ZooKeeper简介和安装

第二章 Kafka的原理解析
   2.1 Kafka的Producer处理逻辑
   2.2 Kafka的broker处理逻辑
   2.3 Kafka的Consumer处理逻辑
   2.4 Kafka集群部署在zk里的存储结构

第三章 Kafka的部署方式
   3.1 Kafka伪集群部署模式
   3.2 Kafka集群部署模式
   3.3 核心配置文件server.properties

第四章 Kafka的Java应用开发
   4.1 Producer端的实现
   4.2 Consumer端的实现

第五章 Kafka的Scala应用开发
   5.1 Scala的Producer的实现
   5.2 Scala的Producer的实现

第六章 Kafka与Hadoop的集成
   6.1 Hadoop简介和配置
   6.2 集成Kafka和Hadoop

第七章 Kafka与Flume的集成
   7.1 Flume简介和使用
   7.2 集成Kafka和Flume

第八章 Kafka与Storm的集成
   8.1 Storm的运行机制和部署
   8.2 Storm编程案例
   8.3 集成Kafka和Storm

相关推荐

    分布式消息系统Kafka.pdf

    本文主要介绍了分布式消息系统Kafka的概述、架构、应用场景、工作原理等方面的知识点。 1. Kafka 概述 Kafka 是一个快速、可扩展的、高吞吐的、可容错的分布式“发布-订阅”消息系统。使用 Scala 与 Java 语言编写...

    基于分布式的发布订阅消息系统Kafka

    **基于分布式的发布订阅消息系统Kafka** Kafka是一种高性能、可扩展的分布式消息系统,由LinkedIn开发并贡献给了Apache软件基金会。它被设计为一个实时处理大量数据的平台,适用于大数据流处理、日志聚合、网站活性...

    消息队列kafka介绍.pdf

    消息队列Kafka是一种分布式、高吞吐量、高可扩展性的消息服务系统。它最初是由LinkedIn公司开发,并于2011年捐赠给了Apache软件基金会,成为了开源项目之一。Kafka被设计为一个能够处理大规模数据流的平台,其高性能...

    kafka介绍及部署

    【Kafka介绍】 Apache Kafka是由LinkedIn开发并随后贡献给Apache软件基金会的一个开源流处理平台。Kafka最初设计的目的是为了处理大规模的实时数据流,它能够处理来自各种数据源的活跃流式数据,如页面访问统计、...

    5、kafka监控工具Kafka-Eagle介绍及使用

    Kafka 提供了高吞吐量、低延迟的消息传递能力,是大数据领域中重要的消息队列(MQ)解决方案。Kafka-Eagle 是针对 Kafka 集群设计的一款高效、易用的监控工具,旨在提供对 Kafka 的深度监控和管理。 Kafka-Eagle 的...

    Kafka入门、介绍、使用及部署

    Kafka是由LinkedIn于2010年12月开源的一款强大的消息系统,它主要用于处理活跃的流式数据,如网站的PV(页面浏览量)、用户行为数据等。传统日志分析系统虽然能够提供一种可扩展的离线处理日志信息的方案,但在实时...

    介绍kafka及kafka集群安装

    Kafka 是一款开源的分布式消息系统,以其高吞吐量、低延迟的特点,在大数据处理领域有着广泛的应用。Kafka 由 LinkedIn 开发并贡献给 Apache 软件基金会,最终成为其顶级项目之一。Kafka 的核心设计理念是为实时数据...

    星环大数据平台_Kafka消息发布与订阅.pdf

    它的分区(partitioning)和复制(replication)特性,进一步增强了消息系统的可用性和容错性。Kafka不仅能够处理海量数据的实时流式处理,还能够作为数据仓库与外部系统进行数据交换,支持各种实时计算任务和离线...

    1、kafka(2.12-3.0.0)介绍、部署及验证、基准测试

    作者提供的其他相关文章,如 Java API 的使用、Kafka 的关键概念详解、分区和副本介绍,以及 Kafka 监控工具 Kafka-Eagle 的使用,将更深入地探讨 Kafka 的功能和操作。 总之,Apache Kafka 是一个强大的工具,适用...

    Kafka介绍.pptx

    ### Kafka介绍 #### Kafka概述 Kafka是一种分布式发布-订阅消息系统,最初由LinkedIn公司开发,后成为Apache软件基金会的顶级项目。Kafka主要使用Scala语言编写,具有高吞吐量、可持久化、分布式扩展性强等特点。它...

    可视化kafka测试工具

    Kafka的核心功能包括发布订阅消息系统、高吞吐量的数据处理以及持久化数据存储。然而,对于开发者和运维人员来说,测试和监控Kafka集群的性能和功能往往需要借助一些工具。本文将详细介绍一款可视化Kafka测试工具,...

    kafka安装包-2.13-3.6.2

    **Kafka介绍** Apache Kafka是一款高性能、分布式的消息中间件,由LinkedIn开发并捐献给Apache软件基金会。它最初设计的目标是构建一个实时的数据管道,能够高效地处理大量的数据流,同时支持发布订阅和队列模型,...

    logback日志记录写入kafka

    本主题将详细介绍如何利用Logback和SLF4J来将日志记录到Kafka队列中,以及支持日志解析和过滤等扩展功能。 首先,我们需要理解SLF4J的工作原理。SLF4J提供了一组API,允许我们在应用程序中插入日志语句,而具体的...

    kafka原理介绍及参数.pptx

    消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。 顺序保证  在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证...

    Apache Kafka实战.pdf

    Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和流应用程序。以下是根据书中的内容提取的关键知识点: 1. **Kafka基础**:Kafka是一个分布式流处理平台,它允许发布和订阅持久化的消息流...

    Apache Kafka入门介绍.zip

    总之,Apache Kafka 是一个强大而灵活的分布式消息系统,适用于构建实时数据管道和流应用。通过了解它的核心概念、应用场景以及 Java API 的使用,开发者可以充分利用 Kafka 的优势,构建高性能的大数据处理系统。

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    而Kafka是一个分布式消息系统,广泛用于构建实时数据管道和流应用。为了在Flink中安全地连接到Kafka,我们需要使用Kerberos协议,这是一个广泛采用的网络身份验证协议,可以提供互操作性和可扩展性。 1. **Kerberos...

    kafka.pdf 介绍 为何使用消息系统

    1.1. 为何使用消息系统 1.1.1. 解耦 在项目启动之初来预测将来项目会碰到什么需求, 是极其困难的。 消息系统在处理过程中间插入了一个隐含的、 基于数据的接口层, 两边的处理过程都要实现这一接口。 这允许你独立...

    Java + Kafka + ZooKeeper 构建高吞吐量分布式消息系统详解

    内容概要:文章详细介绍了如何使用 Java、Kafka 和 ZooKeeper 搭建一个高吞吐量的消息系统,涵盖了从环境准备、组件简介到实际编码的全过程。具体包括 Kafka 和 ZooKeeper 的基本概念、安装配置、生产者和消费者的 ...

    Kafka官方中文文档.pdf

    这部分内容着重描述了Kafka作为高性能、可伸缩消息系统的内部工作机制。 操作章节介绍了如何进行基本的Kafka操作,如添加和移除topics、更改topics、优雅地关闭Kafka、检查consumer的位置、集群间做数据镜像、扩展...

Global site tag (gtag.js) - Google Analytics