原创转载请注明出处:http://agilestyle.iteye.com/blog/2393990
前期准备
Project Directory
Maven Dependency
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.fool.kafka</groupId> <artifactId>java-kafka</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
Producer
package org.fool.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-0004:9092,hadoop-0005:9092,hadoop-0006:9092"); props.put("acks", "all"); props.put("retries", 3); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); ProducerRecord<String, String> record = new ProducerRecord<>("test", "Hello World"); producer.send(record); producer.close(); } }
Consumer
package org.fool.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerExample { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "hadoop-0004:9092,hadoop-0005:9092,hadoop-0006:9092"); props.put("group.id", "testGroup"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } }
Run
先Run Consumer,再Run Producer
Producer Console
Consumer Console
相关推荐
为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理以及它如何提升系统性能。 Kafka生产者连接池是一种资源复用机制,它允许多个...
除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、连接器(Connectors)以及Kafka Streams,这些都可能在"Kafka-java-demo"中有所体现,帮助你更好地理解和应用Kafka。...
4. **Kafka API配置**:在Java中使用Kafka,需要正确配置各种参数,如`bootstrap.servers`(Kafka集群的地址)、`group.id`(消费者的组ID)等,以确保正确连接和操作Kafka集群。 5. **序列化和反序列化**:Kafka...
对于Java连接Kafka,我们首先需要在项目中引入相关的依赖。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖: ```xml <groupId>org.apache.kafka <artifactId>kafka-clients <version>2.8.0</version> ...
- 消费者会话和心跳:保持消费者与Kafka集群的连接,确保高可用性。 通过深入学习和实践"Kafka-java-demo"项目,你将对基于Java的Kafka生产和消费有更深入的理解,从而能够在实际项目中灵活运用Kafka的功能。
- **bootstrap.servers**: 连接Kafka集群的地址列表,用于初始化连接。 - **key.serializer/deserializer**: 关键字序列化和反序列化的类,如StringSerializer/Deserializer。 - **value.serializer/deserializer...
1. **配置**: 生产者需要配置一些参数,如bootstrap.servers(连接Kafka集群的服务器列表)、key.serializer和value.serializer(消息的序列化方式)等。 2. **创建生产者**: 使用`new KafkaProducer(props)`初始化...
2. **使用Java连接Kafka**: - 引入Kafka的Java客户端库。在`pom.xml`或`build.gradle`文件中添加依赖项,如`org.apache.kafka:kafka-clients`。 - 创建Kafka生产者:定义配置属性,如bootstrap servers、key和...
创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "server1:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka....
8. **连接器(Connectors)**:Kafka Connect允许轻松地将Kafka与其他系统集成,如数据库、日志文件等。 在实际应用中,还需要关注性能优化、监控和错误处理等方面,确保Kafka系统的稳定性和效率。了解并掌握这些...
此外,Kafka还支持更高级的功能,如幂等性生产者、消费者位移提交、连接管理、容错机制等。在实际开发中,开发者可以根据需求选择合适的API和配置,确保应用能够高效、稳定地与Kafka集群交互。 总之,这个"Kafka的...
java连接Kafka jar
6. **安装Kafka**:下载Kafka 2.12的zip文件,配置`server.properties`,包括设置Zookeeper连接地址和broker ID,启动Kafka服务。 7. **创建Kafka主题**:使用Kafka命令行工具创建主题,例如`kafka-topics.sh --...
1. **kafka-clients.jar**:这是Kafka Java客户端的核心库,包含了生产者、消费者以及其他客户端API,用于连接Kafka服务器,发送和接收消息。 2. **slf4j-api.jar**:简单日志门面(SLF4J)是一个用于各种日志框架...
"Java实现Kafka数据生产者"这个功能可以让你使用Java编程语言创建一个能够连接到指定的Kafka集群(通过bootstrap servers)并向其中发送数据的应用程序。通过这个功能,你可以将数据以消息的形式发送到Kafka主题中,...
这里,`kafka-clients`提供了客户端API,用于连接和操作Kafka集群,而`kafka_2.13`包含了Kafka服务器端的组件,适用于Scala 2.13的版本。当然,具体版本应根据实际需求和兼容性选择。 描述中提到的"直接解压到maven...
3. **使用方法**:在Windows环境下,通常需要先设置环境变量,将Kafka和Java的安装路径添加到PATH中,然后解压并运行客户端工具。对于图形化工具,通常只需按照界面提示进行操作即可。例如,创建新主题时,选择相应...
在本文中,我们将深入探讨如何搭建Kafka集群以及如何使用Java编写Kafka的生产者和消费者。Kafka是由LinkedIn开发并贡献给Apache软件基金会的消息队列系统,它被广泛用于实时数据流处理和大数据分析。 ### Kafka集群...
Java连接池是优化数据库操作的关键技术之一,它允许应用程序高效地管理和重用数据库连接,避免了频繁创建和关闭连接导致的性能开销。本篇将详细介绍如何在Java中实现一个简单的连接池,以及相关的概念和技术。 首先...
在Java中实现Kafka的生产者和消费者,需要依赖Kafka提供的客户端库。Maven项目中通常需要在pom.xml文件中加入kafka-clients依赖,如下: ```xml <groupId>org.apache.kafka <artifactId>kafka-clients ...