0.Kafka服务器的配置
一个Broker,
一个Topic
Topic中只有一个Partition()
1. Producer:
package kafka.examples.producers; import kafka.producer.KeyedMessage; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; import java.util.Properties; public class SimpleProducer { private static Producer<Integer, String> producer; private static final Properties props = new Properties(); ///ProducerConfig没有关于Zookeeper的配置信息 static { props.put("broker.list", "192.168.26.140:9092"); /*metadata.broker.list is for bootstrapping and the producer will only use it for getting metadata (topics, partitions and replicas). The socket connections for sending the actual data will be established based on the broker information returned in the metadata. The format is host1:port1,host2:port2, and the list can be a subset of brokers or a VIP pointing to a subset of brokers.*/ props.put("metadata.broker.list", "192.168.26.140:9092"); /*The serializer class for messages. The default encoder(kafka.serializer.DefaultEncoder) takes a byte[] and returns the same byte[].*/ props.put("serializer.class", "kafka.serializer.StringEncoder"); /**/ props.put("request.required.acks", "1"); producer = new Producer<Integer, String>(new ProducerConfig(props)); } public static void main(String[] args) { String topic = "learn.topic"; String messageStr = "This is a simple message from JavaAPI Producer2"; ///Key如何生成的? KeyedMessage<Integer, String> data = new KeyedMessage<Integer,String>(topic, messageStr); producer.send(data); producer.close(); } }
关于request.required.acks:
This value controls when a produce request is considered completed. Specifically, how many other brokers must have committed the data to their log and acknowledged this to the leader? Typical values are
- 0, which means that the producer never waits for an acknowledgement from the broker (the same behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees (some data will be lost when a server fails).
- 1, which means that the producer gets an acknowledgement after the leader replica has received the data. This option provides better durability as the client waits until the server acknowledges the request as successful (only messages that were written to the now-dead leader but not yet replicated will be lost).
- -1, The producer gets an acknowledgement after all in-sync replicas have received the data. This option provides the greatest level of durability. However, it does not completely eliminate the risk of message loss because the number of in sync replicas may, in rare cases, shrink to 1. If you want to ensure that some minimum number of replicas (typically a majority) receive a write, then you must set the topic-level min.insync.replicas setting. Please read the Replication section of the design documentation for a more in-depth discussion.
关于KeyedMessage:
/** * A topic, key, and value. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored. */ case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) { if(topic == null) throw new IllegalArgumentException("Topic cannot be null.") def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message) def this(topic: String, key: K, message: V) = this(topic, key, key, message) //分区键,如果没有,是什么行为 def partitionKey = { if(partKey != null) partKey else if(hasKey) key else null } def hasKey = key != null }
2. Consumer
package kafka.examples.consumers; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import kafka.consumer.Consumer; import kafka.consumer.ConsumerConfig; import kafka.consumer.ConsumerIterator; import kafka.consumer.KafkaStream; import kafka.javaapi.consumer.ConsumerConnector; public class SimpleHLConsumer { private final ConsumerConnector consumer; private final String topic; public SimpleHLConsumer(String zookeeper, String groupId, String topic) { ///Consumer的属性配置 Properties props = new Properties(); props.put("zookeeper.connect", zookeeper); //consumer group id props.put("group.id", groupId); /* ZooKeeper session timeout. If the server fails to heartbeat to ZooKeeper within this period of time it is considered dead. If you set this too low the server may be falsely considered dead; if you set it too high it may take too long to recognize a truly dead server. */ props.put("zookeeper.session.timeout.ms", "500"); //默认6秒 ///How far a ZK follower can be behind a ZK leader.默认两秒 props.put("zookeeper.sync.time.ms", "250"); ///offset自动提交的时间间隔 props.put("auto.commit.interval.ms", "1000"); consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props)); this.topic = topic; } public void doConsume() { Map<String, Integer> topicCount = new HashMap<String, Integer>(); // Define single thread for topic topicCount.put(topic, new Integer(1)); Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount); //KafkaStream是一个BlockingQueue List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); ///有几个线程,就会有几个Kafka Stream for (final KafkaStream stream : streams) { /** * An iterator that blocks until a value can be read from the supplied queue. * The iterator takes a shutdownCommand object which can be added to the queue to trigger a shutdown * */ ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator(); ///阻塞在hasNext等待消息到来 while (consumerIte.hasNext()) { System.out.println("Message from Single Topic :: " + new String(consumerIte.next().message())); } } if (consumer != null) { consumer.shutdown(); } } public static void main(String[] args) { String topic = "learn.topic"; ////learn.topic.consumers.group是消费者群组,不需要预先定义,但是会记录到Zookeeper中 SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.26.140:2181", "learn.topic.consumers.group", topic); simpleHLConsumer.doConsume(); } }
3. 注意的问题:
因为Kafka服务器和Producer、Consumer不在同一个机器上,因此在配置Kafka中的Zookeeper连接信息以及server.properties中的host.name时,需要指定具体的IP,不能使用localhost
相关推荐
在本教程中,我们将探讨如何搭建一个支持SASL(Simple Authentication and Security Layer)认证的Kafka集群,并使用Python API来创建Producer和Consumer。 ### 1. 安装Kafka 首先,我们需要在服务器上安装...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
kafka_client_producer_consumer
在"Kafka-Simple-Producer-Consumer-master"这个项目中,可能包含了示例代码,展示了如何使用Java 8编写Kafka的生产者和消费者。这些示例通常会包含以下部分: - 生产者类(ProducerClass.java):创建并配置...
总结起来,"pentaho-kafka-consumer.zip"是一个用于Pentaho Kettle的插件,它提供了与Apache Kafka集成的能力,使得用户可以方便地从Kafka中消费数据,并在Pentaho的工作流中进行进一步的数据处理和集成操作。...
一本经典的kafka入门书籍。 Kafka is a distributed,partitioned,replicated commit logservice。...无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
这只是一个基本的示例,实际使用时可以根据需求进行更复杂的配置,如定制序列化器、设置回调函数、处理异常等。同时,确保 Kafka 服务和 ZooKeeper 正常运行,以便于 Spring Boot 应用程序能够顺利与 Kafka 集群交互...
Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...
本文将总结 Kafka 的一些重要知识点,包括 Kafka 的基本概念、producer 和 consumer 的命令行、pull 和 push 模式、消费状态跟踪方法等。 Kafka 的基本概念 Kafka 是一个基于发布-订阅模式的消息队列系统,主题...
在本文中,我们将深入探讨如何在Apache Kafka中配置SASL/PLAIN认证机制,并通过具体的密码验证实现安全的通信...这为保障数据的安全传输提供了基础,同时,了解和实践这些配置将有助于我们更好地理解和管理Kafka系统。
Kafka 的整体架构非常简单,producer、broker(Kafka)和 consumer 都可以有多个。Producer,consumer 实现 Kafka 注册的接口,数据从 producer 发送到 broker,broker 承担一个中间缓存和分发的作用。broker 分发...
综上所述,在星环大数据平台中使用Kafka进行消息发布与订阅涉及到多个组件和步骤,包括安装和配置TDHClient,创建和管理Kafka Topic,以及通过Kafka Console Producer和Kafka Console Consumer进行消息的发布和订阅...
这个库提供了Producer和Consumer的API,使得我们可以方便地发送和接收消息。例如,创建一个Producer,我们需要配置Broker的地址、主题等信息,然后调用send()方法将消息发送到指定主题。对于Consumer,我们可以通过...
这个名为"Kafka-producer-consumer"的项目显然关注的是Kafka中的生产者(Producer)和消费者(Consumer)组件,这两部分是Kafka生态系统中的核心元素。在这里,我们将深入探讨Kafka的生产者与消费者模型,以及如何...
除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、连接器(Connectors)以及Kafka Streams,这些都可能在"Kafka-java-demo"中有所体现,帮助你更好地理解和应用Kafka。...
kettle7.1版本整合kafka,kafka插件包含生产者、消费者。直接在kettle安装目录plugins下创建steps目录,并解压下载文件到kettle/plugins/steps目录。具体可查看我博文。
包括生产者客户端(Producer API)和消费者客户端(Consumer API),用于发布消息到主题(Topics)和订阅/消费消息。 3. **Java开发Kafka生产者**: - 使用`KafkaProducer`类创建生产者实例,需要配置如bootstrap ...
阿里云消息队列Kafka Demo是一个基于Scala编程语言实现的应用示例,旨在帮助开发者了解如何在阿里云环境中使用Kafka进行消息生产和消费。Kafka是一种分布式流处理平台,常用于构建实时数据管道和流应用,它能够高效...
Kafka 使用手册 Kafka 是一个高性能、分布式的消息队列系统,广泛应用于大数据处理、实时数据处理和流式数据处理等领域。本文档将详细介绍 Kafka 的安装步骤、基本操作命令和配置文件的修改,以便让初学者快速入门...
Kafka是一个分布式流处理平台,它允许我们创建发布(Producer)和订阅(Consumer)主题(Topic)。主题是逻辑上的分类,类似于数据库中的表。发布者向主题发布消息,而消费者则从主题订阅并消费这些消息。Kafka通过...