`

初尝Apache之kafka

阅读更多

一、环境准备:

       下载kafka官方安装包:http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz

        获取最新的下载包: http://kafka.apache.org/downloads.html

        服务器使用:centos7  jdk1.8或者jdk1.7

        kafka版本:kafka_2.10-0.9.0.1.tgz

二、安装JDK

       解压配置环境变量即可(已安装openjdk,需要先卸载)

       rpm包安装参照:http://wkm.iteye.com/blog/1249553

三、安装kafka:

       # tar -xvf kafka_2.10-0.9.0.1.tgz

       kafka解压即可使用

四、启动kafka

       kafka依赖 ZooKeeper服务,故先要启动ZooKeeper服务。

       kafka中带有ZooKeeper服务,启动方式如下:

       # cd kafka_2.10-0.9.0.1

       # ./bin/zookeeper-server-start.sh config/zookeeper.properties

       ZooKeeper服务默认端口为 2181

 

       接下来启动kafka服务

       # ./bin/kafka-server-start.sh config/server.properties

       kafka服务默认端口为 9092

 

五、测试kafka

       1、创建名称为my-topic的topic:

       # ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic my-topic

       2、查询topic列表

       # ./bin/kafka-topics.sh --list --zookeeper localhost:2181

          my-topic

       3、启动消息生产者并发布消息:

       # ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic

          message1

       4、启动消息消费者订阅接收消息:

       # ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic my-topic --from-beginning

          message1

  

       到这里kafka单击服务已经成功启动,并可以发送和接收消息了。

六、关闭kafka

      1、关闭kafka

       # ./bin/kafka-server-stop.sh

       2、关闭ZooKeeper服务

       # ./bin/zookeeper-server-stop.sh

 

 -------到这里,单机版的kafka服务已成功完成安装配置了,下面我们用kafka-client来发布订阅消息---------

   生产者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

   消费者代码样例详见官方文档:http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

   注意:由于上面的kafka服务配置的是单机版的,下面的代码运行需要与kafka服务在同一台机器上。

   如果你的kafka-client和kafka服务不在同一台机器上,修改kafka服务配置文件

   修改配置文件  kafka_2.10-0.9.0.1/config/server.properties

    1、将

         #advertised.host.name=<hostname routable by clients>

         改为

          advertised.host.name=kafka机器的IP地址

    2、将

         #host.name=localhost

         改为

         host.name=kafka机器的IP地址

   添加依赖包:kafka-client

   gradle 项目:

  

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '0.9.0.1'

 

   maven项目:
  

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

 

 

    参照官方样例(生产者):

 

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        for(Object key:props.keySet()){
            LOG.info("key:" + key + ";value:" + props.get(key));
        }
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        for(int i = 0; i < 10000; i++) {
            LOG.info("send ..." + i);
            ProducerRecord record = new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i));
            LOG.info("p:" + record.partition());
            LOG.info("t:" + record.topic());
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(metadata != null) {
                        LOG.info("metadata:" + metadata.topic() + "" + metadata.partition() + "" + metadata.offset());
                    } else {
                        LOG.info("metadata:" + metadata);
                    }
                    if(exception != null) {
                        LOG.info("exception:" + exception.getMessage(), exception);
                    } else {
                        LOG.info("exception:" + exception);
                    }
                }
            });
        }
        producer.close();
     参照官方样例(消费者):

 

   
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test1");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        for(Object key:props.keySet()){
            LOG.info("key:" + key + ";value:" + props.get(key));
        }
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("my-topic"));
        int count = 0;
        while (count < 1000) {
            LOG.info("receive message...");
            ConsumerRecords<String, String> records = consumer.poll(1000);
            LOG.info("received");
            for (ConsumerRecord<String, String> record : records) {
                LOG.info("offset = " + record.offset() + ", key = " + record.key() + ", value = "  + record.value());
            }
            count ++;
        }
        consumer.close();
     已经过测试,希望本文能帮到您。
(已迁移)
分享到:
评论

相关推荐

    org.apache.kafka kafka_2.10 0.10.2.0 的jar包下载

    依赖方式  &lt;groupId&gt;org.apache.kafka  &lt;artifactId&gt;kafka_2.10  &lt;version&gt;0.10.0.0 但是没有被中央仓库的任何jar包依赖!您可以在这里点击下载

    org.apache.kafka kafka-clients 0.10.1.1 的jar包下载

    &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients &lt;version&gt;0.10.1.1 &lt;/dependency&gt;

    Apache Kafka实战.pdf

    《Apache Kafka实战》这本书深入浅出地介绍了Apache Kafka这一分布式流处理平台的各个方面,旨在帮助读者掌握Kafka的实际应用和核心概念。Kafka是一个高吞吐量、低延迟的消息发布订阅系统,常用于构建实时数据管道和...

    org.apache.kafka kafka-log4j-appender 的jar包下载

    &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-log4j-appender &lt;version&gt;0.10.2.0 &lt;/dependency&gt;

    Apache Jmeter Kafka Jar包

    【Apache Jmeter Kafka Jar包】是Apache JMeter的扩展组件,专为测试和性能评估Apache Kafka消息系统设计。这个jar包允许用户在JMeter测试计划中直接发送和接收Kafka消息,从而对Kafka集群的性能进行深入分析。 ...

    apache-kafka-1.0.0 java Demo

    Apache Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。在这个"apache-kafka-1.0.0 java Demo"中,我们将探讨如何使用Java API与Kafka进行交互,实现基本的消息生产与消费功能。这个Demo虽然没有...

    Spring for Apache Kafka API(Spring for Apache Kafka 开发文档).CHM

    Spring for Apache Kafka API。 Spring for Apache Kafka 开发文档。

    Learning Apache Kafka 2nd Edition

    This book is here to help you get familiar with Apache Kafka and to solve your challenges related to the consumption of millions of messages in publisher-subscriber architectures. It is aimed at ...

    apache kafka

    Kafka是一个对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 1.Kafka集群包含一个或多个服务器,这种服务器被称为broker 2.Partition是物理上的概念,每个Topic...

    apache kafka pdf下载

    Apache Kafka是一款分布式流处理平台,它被设计用来处理大量数据,并且提供一个统一、高吞吐量、低延迟的处理数据管道。由于Kafka的这些特性,它在大数据处理和实时数据管道领域得到了广泛的应用。Kafka作为一个开源...

    Apache Kafka Cookbook(PACKT,2015)

    《Apache Kafka Cookbook》是由PACKT在2015年出版的一本专著,专注于介绍Apache Kafka这一分布式流处理平台的实战技巧和最佳实践。Apache Kafka是一个高性能、可扩展且容错性强的消息中间件,它被广泛应用于大数据...

    Building Data Streaming Applications with Apache Kafka

    Apache Kafka is a popular distributed streaming platform that acts as a messaging queue or an enterprise messaging system. It lets you publish and subscribe to a stream of records and process them in ...

    Apache Kafka(带书签)

    Kafka的核心特性包括高性能、可扩展性、持久性和可靠性,使其成为现代大数据生态系统中的关键组件之一。 #### 设置Apache Kafka集群 在设置Apache Kafka集群之前,需要了解其架构和组成部分: - **Broker**:Kafka...

    Streaming Architecture New Designs Using Apache Kafka and MapR Streams

    本文将深入探讨"Streaming Architecture New Designs Using Apache Kafka and MapR Streams"这一主题,阐述如何利用这两种强大的工具构建高效、可扩展的流处理系统。 Apache Kafka是一种分布式流处理平台,由...

    apache-kafka-documentation-cn.zip_apache kafka_kafka

    Apache Kafka 是一个分布式流处理平台,由LinkedIn 开发并捐赠给了Apache 软件基金会,现在已成为大数据领域的重要组件。Kafka 主要用于构建实时数据管道和流应用,它能够高效地处理大量的实时数据,同时提供了消息...

    Apache Kafka.pdf

    ### Apache Kafka 知识点概览 #### 一、引言 Apache Kafka 是一款开源的流处理平台,由 LinkedIn 开发并捐赠给 Apache 软件基金会。它以一种高吞吐量、低延迟的方式处理实时数据流,并支持可扩展性、持久性和容错性...

    Building Data Streaming Applications with Apache Kafka azw3

    Building Data Streaming Applications with Apache Kafka 英文azw3 本资源转载自网络,如有侵权,请联系上传者或csdn删除 本资源转载自网络,如有侵权,请联系上传者或csdn删除

    Learning Apache Kafka, 2nd Edition

    Learning Apache Kafka Second Edition provides you with step-by-step, practical examples that help you take advantage of the real power of Kafka and handle hundreds of megabytes of messages per second ...

    li-apache-kafka-clients:li-apache-kafka-clients是Apache Kafka香草客户端的包装库。 它在开源Apache Kafka中提供了其他功能,例如大消息支持以及对Java生产者和消费者的审核。

    li-apache-kafka-clients介绍li-apache-kafka-clients是在香草Apache Kafka客户端之上构建的包装Kafka客户端库。 Apache Kafka现在已成为非常流行的消息传递系统,并以其低延迟,高吞吐量和持久的消息传递而闻名。 ...

Global site tag (gtag.js) - Google Analytics