`
bit1129
  • 浏览: 1069595 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

【Kakfa五】Kafka Producer和Consumer基本使用

 
阅读更多

0.Kafka服务器的配置

一个Broker,

一个Topic

Topic中只有一个Partition()

 

1. Producer:

package kafka.examples.producers;


import kafka.producer.KeyedMessage;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class SimpleProducer {
    private static Producer<Integer, String> producer;
    private static final Properties props = new Properties();
    ///ProducerConfig没有关于Zookeeper的配置信息
    static {
        props.put("broker.list", "192.168.26.140:9092");

        /*metadata.broker.list is for bootstrapping and the producer will only use it for getting 
        metadata (topics, partitions and replicas). The socket connections for 
        sending the actual data will be established based on the broker 
        information returned in the metadata. The format is 
        host1:port1,host2:port2, and the list can be a subset of brokers or a 
        VIP pointing to a subset of brokers.*/
        props.put("metadata.broker.list", "192.168.26.140:9092");

        /*The serializer class for messages. The default encoder(kafka.serializer.DefaultEncoder) takes a byte[] and returns the same byte[].*/
        props.put("serializer.class", "kafka.serializer.StringEncoder");

        /**/
        props.put("request.required.acks", "1");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
    }

    public static void main(String[] args) {
        String topic = "learn.topic";
        String messageStr = "This is a simple message from JavaAPI Producer2";
        ///Key如何生成的?
        KeyedMessage<Integer, String> data = new KeyedMessage<Integer,String>(topic, messageStr);
        producer.send(data);
        producer.close();
    }
}

 

 

关于request.required.acks:

 

This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are

  • 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
  • 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
  • -1, The producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the greatest level of durability. However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. Please read the Replication section of the design documentation for a more in-depth discussion.

关于KeyedMessage:

/**
 * A topic, key, and value.
 * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
 */
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  if(topic == null)
    throw new IllegalArgumentException("Topic cannot be null.")
  
  def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  
  def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  //分区键,如果没有,是什么行为
  def partitionKey = {
    if(partKey != null)
      partKey
    else if(hasKey)
      key
    else
      null  
  }
  
  def hasKey = key != null
}

 

 

 

2. Consumer

package kafka.examples.consumers;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class SimpleHLConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public SimpleHLConsumer(String zookeeper, String groupId, String
            topic) {
        ///Consumer的属性配置
        Properties props = new Properties();
        props.put("zookeeper.connect", zookeeper);
        //consumer group id
        props.put("group.id", groupId);
        /*
        ZooKeeper session timeout. If the server fails to heartbeat to ZooKeeper
        within this period of time it is considered dead. If you set this too 
        low the server may be falsely considered dead; if you set it too high it
        may take too long to recognize a truly dead server.
        */
        props.put("zookeeper.session.timeout.ms", "500"); //默认6秒
        ///How far a ZK follower can be behind a ZK leader.默认两秒
        props.put("zookeeper.sync.time.ms", "250");
        ///offset自动提交的时间间隔
        props.put("auto.commit.interval.ms", "1000");
        consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
        this.topic = topic;
    }

    public void doConsume() {
        Map<String, Integer> topicCount = new HashMap<String, Integer>();
        // Define single thread for topic
        topicCount.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
        //KafkaStream是一个BlockingQueue
        List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
        ///有几个线程,就会有几个Kafka Stream
        for (final KafkaStream stream : streams) {
            /**
             * An iterator that blocks until a value can be read from the supplied queue.
             * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown
             *
            */
            ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
            ///阻塞在hasNext等待消息到来
            while (consumerIte.hasNext()) {
                System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message()));
            }
        }
        if (consumer != null) {
            consumer.shutdown();
        }
    }

    public static void main(String[] args) {
        String topic = "learn.topic";
        ////learn.topic.consumers.group是消费者群组,不需要预先定义,但是会记录到Zookeeper中
        SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.26.140:2181", "learn.topic.consumers.group", topic);
        simpleHLConsumer.doConsume();
    }
}

 

3. 注意的问题:

因为Kafka服务器和Producer、Consumer不在同一个机器上,因此在配置Kafka中的Zookeeper连接信息以及server.properties中的host.name时,需要指定具体的IP,不能使用localhost

 

 

分享到:
评论

相关推荐

    Python-kafka集群搭建PythonAPI调用Producer和Consumer

    在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...

    Kafka客户端producer/consumer样例

    Kafka客户端producer/consumer样例

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    kafka_client_producer_consumer

    kafka_client_producer_consumer

    Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化

    在"Kafka-Simple-Producer-Consumer-master"这个项目中,可能包含了示例代码,展示了如何使用Java 8编写Kafka的生产者和消费者。这些示例通常会包含以下部分: - 生产者类(ProducerClass.java):创建并配置...

    pentaho-kafka-consumer.zip

    总结起来,"pentaho-kafka-consumer.zip"是一个用于Pentaho Kettle的插件,它提供了与Apache Kafka集成的能力,使得用户可以方便地从Kafka中消费数据,并在Pentaho的工作流中进行进一步的数据处理和集成操作。...

    kafka权威指南目录.zip

    一本经典的kafka入门书籍。 Kafka is a distributed,partitioned,replicated commit logservice。...无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。

    Springboot集成Kafka实现producer和consumer的示例代码

    这只是一个基本的示例,实际使用时可以根据需求进行更复杂的配置,如定制序列化器、设置回调函数、处理异常等。同时,确保 Kafka 服务和 ZooKeeper 正常运行,以便于 Spring Boot 应用程序能够顺利与 Kafka 集群交互...

    kafka java 下载的jar

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...

    kafka 18道面试题和答案.docx

    本文将总结 Kafka 的一些重要知识点,包括 Kafka 的基本概念、producer 和 consumer 的命令行、pull 和 push 模式、消费状态跟踪方法等。 Kafka 的基本概念 Kafka 是一个基于发布-订阅模式的消息队列系统,主题...

    Kafka简介及使用PHP处理Kafka消息

    Kafka 的整体架构非常简单,producer、broker(Kafka)和 consumer 都可以有多个。Producer,consumer 实现 Kafka 注册的接口,数据从 producer 发送到 broker,broker 承担一个中间缓存和分发的作用。broker 分发...

    星环大数据平台_Kafka消息发布与订阅.pdf

    综上所述,在星环大数据平台中使用Kafka进行消息发布与订阅涉及到多个组件和步骤,包括安装和配置TDHClient,创建和管理Kafka Topic,以及通过Kafka Console Producer和Kafka Console Consumer进行消息的发布和订阅...

    kafka资源文件.

    这个库提供了Producer和Consumer的API,使得我们可以方便地发送和接收消息。例如,创建一个Producer,我们需要配置Broker的地址、主题等信息,然后调用send()方法将消息发送到指定主题。对于Consumer,我们可以通过...

    kafka-producer-consumer

    这个名为"Kafka-producer-consumer"的项目显然关注的是Kafka中的生产者(Producer)和消费者(Consumer)组件,这两部分是Kafka生态系统中的核心元素。在这里,我们将深入探讨Kafka的生产者与消费者模型,以及如何...

    kafka-java-demo 基于java的kafka生产消费者示例

    除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、连接器(Connectors)以及Kafka Streams,这些都可能在"Kafka-java-demo"中有所体现,帮助你更好地理解和应用Kafka。...

    kettle整合kafka生产者消费者插件

    kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。

    java开发kafka-clients所需要的所有jar包以及源码

    包括生产者客户端(Producer API)和消费者客户端(Consumer API),用于发布消息到主题(Topics)和订阅/消费消息。 3. **Java开发Kafka生产者**: - 使用`KafkaProducer`类创建生产者实例,需要配置如bootstrap ...

    阿里云消息队列kafka demo

    阿里云消息队列Kafka Demo是一个基于Scala编程语言实现的应用示例,旨在帮助开发者了解如何在阿里云环境中使用Kafka进行消息生产和消费。Kafka是一种分布式流处理平台,常用于构建实时数据管道和流应用,它能够高效...

    Kafka使用手册

    Kafka 使用手册 Kafka 是一个高性能、分布式的消息队列系统,广泛应用于大数据处理、实时数据处理和流式数据处理等领域。本文档将详细介绍 Kafka 的安装步骤、基本操作命令和配置文件的修改,以便让初学者快速入门...

    C#kafka开发实例

    Kafka是一个分布式流处理平台,它允许我们创建发布(Producer)和订阅(Consumer)主题(Topic)。主题是逻辑上的分类,类似于数据库中的表。发布者向主题发布消息,而消费者则从主题订阅并消费这些消息。Kafka通过...

Global site tag (gtag.js) - Google Analytics