`
bit1129
  • 浏览: 1069621 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Kafka一】Kafka入门

 
阅读更多

这篇文章来自Spark集成Kafka(http://bit1129.iteye.com/blog/2174765),这里把它单独取出来,作为Kafka的入门吧

 

下载Kafka

 http://mirror.bit.edu.cn/apache/kafka/0.8.1.1/kafka_2.10-0.8.1.1.tgz 
 2.10表示Scala的版本,而0.8.1.1表示Kafka的版本 

 

解压Kafka

惊讶的是Kafka内置了Zookeeper的安装包以及启停Zookeeper的脚本,版本比较低,是3.3.4版本。理论上不应该使用Kafka的版本,因为Zookeeper是个通用分布式配置和协调系统.

实际上,也可以使用Kafka内置的Zookeeper,不过要注意,Zookeeper一般使用3台做集群,如果Kakfa的Broker多于3台,那么就取其中3台运行Zookeeper

 

配置Kafka

 1. 修改配置文件config/server.properties

 

 host.name和avertised.host.name默认是注释掉的,把它打开。一般情况下,不要使用localhost作为host.name,如果使用localhost,远程访问Kakfa服务器容易出现问题。

 

    # Hostname the broker will bind to. If not set, the server will bind to all interfaces  
    host.name=localhost  
      
    # Hostname the broker will advertise to producers and consumers. If not set, it uses the  
    # value for "host.name" if configured.  Otherwise, it will use the value returned from  
    # java.net.InetAddress.getCanonicalHostName().  
    advertised.host.name=localhost  

 

 

2. 配置Zookeeper

    本文使用单独安装的Zookeeper,而不是使用Kafka自带的Zookeeper,Kafka为了能够知道它要连接的Zookeeper地址,配置文件中提供了一系列和Zookeeper相关的配置参数

     除了安装运行独立的Zookeeper,Kafka也可以使用安装包里的Zookeeper,如果Kafka要使用自己的Zookeeper,那么需要在 Kafka的bin目录下启动Zookeeper。因此,如果使用独立的Zookeeper的时候,就无  需启动Kafka下面的Zookeeper了。在 Kafka启动过程中看到有关Zookeeper的日志,这是Kafka作为Zookeeper的客户端正在建立与Zookeeper服务器的通讯

  •  config/server.properties

 

  ############################# Zookeeper #############################  
      
    # Zookeeper connection string (see zookeeper docs for details).  
    # This is a comma separated host:port pairs, each corresponding to a zk  
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".  
    # You can also append an optional chroot string to the urls to specify the  
    # root directory for all kafka znodes.  
      
    //2181是Zookeeper的clientPort  
    zookeeper.connect=localhost:2181  
      
    # Timeout in ms for connecting to zookeeper  
    zookeeper.connection.timeout.ms=1000000  

 

  • config/producer.properties

    无相关配置

 

  • config/consumer.properties
    # Zookeeper connection string  
    # comma separated host:port pairs, each corresponding to a zk  
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"  
    zookeeper.connect=127.0.0.1:2181  
      
    # timeout in ms for connecting to zookeeper  
    zookeeper.connection.timeout.ms=1000000

 

启动Zookeeper

1.根据之前Kafka对Zookeeper的配置,Zookeeper应该配置端口2181端口
2. 使用如下命令启动Zookeeper,启动Zookeeper的参数如下:

 

    # The number of milliseconds of each tick  
    tickTime=2000  
    # The number of ticks that the initial   
    # synchronization phase can take  
    initLimit=10  
    # The number of ticks that can pass between   
    # sending a request and getting an acknowledgement  
    syncLimit=5  
    # the directory where the snapshot is stored.  
    # do not use /tmp for storage, /tmp here is just   
    # example sakes.  
    dataDir=/home/hadoop/software/zookeeper-3.4.6/data  
    # the port at which the clients will connect  
    clientPort=2181  

 

 启动Kafka

1.启动Kafka(需要指定server.properties)

 

 [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-server-start.sh config/server.properties  

 

2. 启动日志

 

2015-01-11 01:11:12,490] INFO Verifying properties (kafka.utils.VerifiableProperties)  
    ////broker.id是在server.properties中定义的
    [2015-01-11 01:11:12,558] INFO Property broker.id is overridden to 0 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,558] INFO Property log.cleaner.enable is overridden to false (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,558] INFO Property log.dirs is overridden to /tmp/kafka-logs (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,558] INFO Property log.retention.check.interval.ms is overridden to 60000 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property log.retention.hours is overridden to 168 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property log.segment.bytes is overridden to 536870912 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property num.io.threads is overridden to 8 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property num.network.threads is overridden to 2 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property num.partitions is overridden to 2 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property port is overridden to 9092 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,559] INFO Property socket.receive.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,560] INFO Property socket.request.max.bytes is overridden to 104857600 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,560] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,560] INFO Property zookeeper.connect is overridden to localhost:2181 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,560] INFO Property zookeeper.connection.timeout.ms is overridden to 1000000 (kafka.utils.VerifiableProperties)  
    [2015-01-11 01:11:12,607] INFO [Kafka Server 0], starting (kafka.server.KafkaServer)  
    [2015-01-11 01:11:12,609] INFO [Kafka Server 0], Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)  
    [2015-01-11 01:11:12,640] INFO Client environment:zookeeper.version=3.3.3-1203054, built on 11/17/2011 05:47 GMT (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:host.name=hadoop.master (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.version=1.7.0_67 (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.home=/home/hadoop/software/jdk1.7.0_67/jre (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.class.path=:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/dependant-libs-2.8.0/*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../perf/build/libs//kafka-perf_2.8.0*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../clients/build/libs//kafka-clients*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../examples/build/libs//kafka-examples*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-consumer/build/libs//kafka-hadoop-consumer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../contrib/hadoop-producer/build/libs//kafka-hadoop-producer*.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/jopt-simple-3.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-javadoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-scaladoc.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/kafka_2.10-0.8.1.1-sources.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/log4j-1.2.15.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/metrics-core-2.2.0.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/scala-library-2.10.1.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/slf4j-api-1.7.2.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/snappy-java-1.0.5.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zkclient-0.3.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../libs/zookeeper-3.3.4.jar:/home/hadoop/software/kafka_2.10-0.8.1.1/bin/../core/build/libs/kafka_2.8.0*.jar (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:user.name=hadoop (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:user.home=/home/hadoop (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,640] INFO Client environment:user.dir=/home/hadoop/software/kafka_2.10-0.8.1.1 (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,641] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@7a50a6d4 (org.apache.zookeeper.ZooKeeper)  
    [2015-01-11 01:11:12,643] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)  
    [2015-01-11 01:11:12,706] INFO Opening socket connection to server localhost/127.0.0.1:2181 (org.apache.zookeeper.ClientCnxn)  
    [2015-01-11 01:11:12,716] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)  
    [2015-01-11 01:11:12,756] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x14ad79bb13d0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)  
    [2015-01-11 01:11:12,759] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)  
    ////log.dirs不存在,先创建
    [2015-01-11 01:11:12,919] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager)  
    [2015-01-11 01:11:12,948] INFO Starting log cleanup with a period of 60000 ms. (kafka.log.LogManager)  
    [2015-01-11 01:11:12,975] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)  
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    SLF4J: Defaulting to no-operation (NOP) logger implementation  
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
    [2015-01-11 01:11:13,039] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)  
    [2015-01-11 01:11:13,063] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)  
    [2015-01-11 01:11:13,163] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) 
    ///只有1台Zookeeper服务器,因此serverid为0的Zookeeper成为leader 
    [2015-01-11 01:11:13,219] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)  
    ////注册Broker到Zookeeper的/brokers/ids/0节点
    [2015-01-11 01:11:13,367] INFO Registered broker 0 at path /brokers/ids/0 with address hadoop.master:9092. (kafka.utils.ZkUtils$)  
    [2015-01-11 01:11:13,379] INFO [Kafka Server 0], started (kafka.server.KafkaServer)  
    [2015-01-11 01:11:13,486] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)  

 

 

Kafka简单测试

1.创建一个topic

参数信息:Zookeeper的信息,Topic的名字,Topic的Partition数,复制因子(复制因子必须小于等于Broker数目)

 

 

    ///创建一个Topic,取名为test
    [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1  --topic test  
    Created topic "test".  
    ///列出创建的Topic,这里只有一个test  
    [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-topics.sh --list --zookeeper localhost:2181  
    test  

 

2. Producer创建消息
    启动时,除了打印SLF4J之外,没有别的。下面可以直接输入生产的数据(生产消息时不需要指定Partition,Kafka自动做Partition路由,每个Partition都是有Lead Partition和Follower Partitions组成,Lead Partition负责读写,而Follower Partitions只做复制,在Lead Partition挂了之后,自动做Failover)

 

    [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test  
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    SLF4J: Defaulting to no-operation (NOP) logger implementation  
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
      
      
    This is mesage  
    This is a test  

 


3. Consumer消费消息


    启动时,除了打印SLF4J之外,没有别的

 

   --from-beginning实际上是指定offset的读取策略。

对于smallest和largest还是理解不到位,smallest和largest策略表示Zookeeper上的offset还没有初始化为正确值时,如何初始化offset的问题?试想,Producer生产了一批消息到Kafka中,但是Kafka尚未由任何Consumer读取,而Kafka的Offset是由Consumer进行初始化和赋值的,因此此时的Zookeeper上的offset并没有预期的0(0表示尚未读取过),而是一个不正确的随机数,那么Consumer来读取消息时,是从头开始读还是从最大的位置等待Producer创建消息后再读取,此时就产生了两个选择,smallest表示从头读,largest表示从最大位置读

 

auto.offset.reset(默认是largest):

What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
* smallest : automatically reset the offset to the smallest offset
* largest : automatically reset the offset to the largest offset
* anything else: throw exception to the consumer

 

 

    [hadoop@hadoop kafka_2.10-0.8.1.1]$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".  
    SLF4J: Defaulting to no-operation (NOP) logger implementation  
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
      
      
    This is mesage  
    This is a test 

 

4.此时在Producer终端输入内容,在Consumer终端可以立即收到

 

分享到:
评论

相关推荐

    Kafka入门、介绍、使用及部署

    ### Kafka入门、介绍、使用及部署 #### 一、Kafka简介 Kafka是由LinkedIn于2010年12月开源的一款强大的消息系统,它主要用于处理活跃的流式数据,如网站的PV(页面浏览量)、用户行为数据等。传统日志分析系统虽然...

    Kafka技术内幕-图文详解Kafka源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    kafka入门到精通.txt

    ### Kafka入门到精通知识点概述 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以极高的吞吐量、低延迟和可扩展性而著称,被广泛...

    kafka入门必备手册

    下面将详细介绍Kafka入门的相关知识点。 **Kafka简介** Kafka是一个分布式流处理平台,它的核心功能是消息队列系统,用于处理大量实时数据的发布和订阅。它被设计为在分布式环境中运行,可以处理高并发的读写请求,...

    Apache Kafka入门介绍.zip

    **Apache Kafka 入门介绍** Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并贡献给了 Apache 软件基金会。Kafka 设计用于构建实时数据管道和流应用,能够高效地处理大量实时数据。它结合了消息队列和日志...

    大数据Kafka入门--理论+实践

    在大数据Kafka入门中,首先要了解的理论知识包括了Kafka的基本概念和架构设计。Kafka作为一个分布式的消息系统,其核心组件包括了Producer(生产者)、Consumer(消费者)、Broker(节点)和Topic(主题)。生产者...

    Kafka技术内幕图文详解源码设计与实现

    Kafka自LinkedIn开源以来就以高性能、高吞吐量、分布式的特性著称,本书以0.10版本的源码为基础,深入分析了Kafka的设计与实现,包括生产者和消费者的消息处理流程,新旧消费者不同的设计方式,存储层的实现,协调者...

    Kafka 入门基础篇.pptx

    Kafka 入门基础篇 Kafka 是 LinkedIn 公司开发的一种分布式消息队列系统,支持离线和在线日志处理。它可以实时处理大量数据,满足各种需求场景,如基于 Hadoop 的批处理系统、低延迟的实时系统、Storm/Spark 流式...

    kafka入门资料

    ### Kafka入门知识点详解 #### 一、概述与系统环境 Kafka是一款强大的分布式消息系统,主要应用于实时数据处理场景。其高效的数据传输能力和高吞吐量特性使其在大数据领域受到广泛青睐。 - **系统环境**:本文档...

    Kafka入门.pdf

    Apache Kafka是一个分布式发布-订阅消息系统,也是一个强大的队列系统,能够处理大量数据,并支持离线和在线消息消费。Kafka的消息被保留磁盘上,并在集群内复制以防数据丢失。它建立在ZooKeeper同步服务之上,与...

    Kafka官方中文文档.pdf

    Apache Kafka是一个分布式流处理平台,它具备三个关键特性:发布与订阅记录的能力、持久化地存储这些记录,并且在记录生成时即可进行处理。Kafka适合用于两大类场景:一是构建实时数据管道,能够在系统或应用之间...

    图解 Kafka 之实战指南

    #### 一、Kafka简介 **Kafka** 起初由LinkedIn采用Scala语言开发,后捐赠给Apache基金会,现已成为一款广泛应用于分布式流处理平台的成熟软件。它凭借高吞吐量、可持久化存储、水平扩展能力、支持流数据处理等特性...

    21.消息中间件之Kafka入门讲解

    **Kafka入门详解** Kafka是一款高性能、分布式的消息中间件,最初由LinkedIn开发,后成为Apache顶级项目。它主要用于处理实时数据流,提供高吞吐量的发布订阅服务,同时也支持离线数据处理。在本篇文章中,我们将...

    Kafka简介.pptx

    Kafka的PPT讲义,入门级 Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作 流数据。 这种动作(网页浏览...

    springboot集成kafka简单入门案例

    这只是一个简单的入门案例,实际上Kafka和Spring Boot的整合功能远不止这些。例如,您可以配置多个主题、设置消费者分组、实现幂等性消费者、处理异常、使用Kafka Streams进行复杂的数据处理等。随着对Kafka和Spring...

    kafkapython教程-Kafka快速入门(十二)-Python客户端.pdf

    在本篇Kafka快速入门教程中,我们主要探讨了如何使用Python客户端库`confluent-kafka`来与Apache Kafka进行交互。`confluent-kafka`是一个轻量级的Python模块,它对librdkafka进行了封装,支持Kafka 0.8以上的版本。...

    kafka一小时入门精讲笔记.zip

    【Kafka一小时入门精讲笔记】 在大数据处理和实时流计算领域,Apache Kafka是一款非常重要的开源消息系统。本精讲笔记将带你快速了解Kafka的基本概念、核心特性以及使用场景,助你在一小时内掌握Kafka的基础知识。 ...

Global site tag (gtag.js) - Google Analytics