`

Java连接Kafka

 
阅读更多

原创转载请注明出处:http://agilestyle.iteye.com/blog/2393990

 

前期准备

Kafka集群搭建完毕

 

Project Directory


 

Maven Dependency

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.fool.kafka</groupId>
    <artifactId>java-kafka</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

Producer

package org.fool.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;


public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop-0004:9092,hadoop-0005:9092,hadoop-0006:9092");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        ProducerRecord<String, String> record = new ProducerRecord<>("test", "Hello World");

        producer.send(record);

        producer.close();
    }
}

 

Consumer

package org.fool.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;


public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop-0004:9092,hadoop-0005:9092,hadoop-0006:9092");
        props.put("group.id", "testGroup");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
            }
        }
    }
}

 

Run

先Run Consumer,再Run Producer

Producer Console


Consumer Console


 

 

  • 大小: 10.6 KB
  • 大小: 22.6 KB
  • 大小: 23.7 KB
分享到:
评论

相关推荐

    kafka生产者连接池

    为了优化性能和提高效率,开发人员常常会利用连接池技术来管理Kafka生产者的连接。本文将深入探讨"Kafka生产者连接池"的概念、实现原理以及它如何提升系统性能。 Kafka生产者连接池是一种资源复用机制,它允许多个...

    kafka-java-demo 基于java的kafka生产消费者示例

    除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、连接器(Connectors)以及Kafka Streams,这些都可能在"Kafka-java-demo"中有所体现,帮助你更好地理解和应用Kafka。...

    kafka集群Java开发jar包

    4. **Kafka API配置**:在Java中使用Kafka,需要正确配置各种参数,如`bootstrap.servers`(Kafka集群的地址)、`group.id`(消费者的组ID)等,以确保正确连接和操作Kafka集群。 5. **序列化和反序列化**:Kafka...

    kafka-java-demo_java连接卡夫卡_steamyx8_DEMO_kafka_breathsru_

    对于Java连接Kafka,我们首先需要在项目中引入相关的依赖。如果你使用的是Maven,可以在pom.xml文件中添加以下依赖: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients &lt;version&gt;2.8.0&lt;/version&gt; ...

    kafka-java-demo 基于java的kafka生产消费者例子

    - 消费者会话和心跳:保持消费者与Kafka集群的连接,确保高可用性。 通过深入学习和实践"Kafka-java-demo"项目,你将对基于Java的Kafka生产和消费有更深入的理解,从而能够在实际项目中灵活运用Kafka的功能。

    java_kafka交互工具类.rar

    - **bootstrap.servers**: 连接Kafka集群的地址列表,用于初始化连接。 - **key.serializer/deserializer**: 关键字序列化和反序列化的类,如StringSerializer/Deserializer。 - **value.serializer/deserializer...

    java操作kafka

    1. **配置**: 生产者需要配置一些参数,如bootstrap.servers(连接Kafka集群的服务器列表)、key.serializer和value.serializer(消息的序列化方式)等。 2. **创建生产者**: 使用`new KafkaProducer(props)`初始化...

    kafka_2.11-1.0.0 版本,java,spring连接实例程序

    2. **使用Java连接Kafka**: - 引入Kafka的Java客户端库。在`pom.xml`或`build.gradle`文件中添加依赖项,如`org.apache.kafka:kafka-clients`。 - 创建Kafka生产者:定义配置属性,如bootstrap servers、key和...

    2、java调用kafka api

    创建用于连接Kafka的Properties配置 Properties props = new Properties(); props.put("bootstrap.servers", "server1:9092"); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka....

    java实现kafka生产消费数据接口

    8. **连接器(Connectors)**:Kafka Connect允许轻松地将Kafka与其他系统集成,如数据库、日志文件等。 在实际应用中,还需要关注性能优化、监控和错误处理等方面,确保Kafka系统的稳定性和效率。了解并掌握这些...

    kafka的java依赖包

    此外,Kafka还支持更高级的功能,如幂等性生产者、消费者位移提交、连接管理、容错机制等。在实际开发中,开发者可以根据需求选择合适的API和配置,确保应用能够高效、稳定地与Kafka集群交互。 总之,这个"Kafka的...

    kafka-clients-2.5.1.jar

    java连接Kafka jar

    kafka、zookeep、hadoop集群搭建和java操作kafka.zip

    6. **安装Kafka**:下载Kafka 2.12的zip文件,配置`server.properties`,包括设置Zookeeper连接地址和broker ID,启动Kafka服务。 7. **创建Kafka主题**:使用Kafka命令行工具创建主题,例如`kafka-topics.sh --...

    kafka需要的jar包集合

    1. **kafka-clients.jar**:这是Kafka Java客户端的核心库,包含了生产者、消费者以及其他客户端API,用于连接Kafka服务器,发送和接收消息。 2. **slf4j-api.jar**:简单日志门面(SLF4J)是一个用于各种日志框架...

    Java实现Kafka数据生产者

    "Java实现Kafka数据生产者"这个功能可以让你使用Java编程语言创建一个能够连接到指定的Kafka集群(通过bootstrap servers)并向其中发送数据的应用程序。通过这个功能,你可以将数据以消息的形式发送到Kafka主题中,...

    Maven搭建Kafka Java开发环境需要的jar包

    这里,`kafka-clients`提供了客户端API,用于连接和操作Kafka集群,而`kafka_2.13`包含了Kafka服务器端的组件,适用于Scala 2.13的版本。当然,具体版本应根据实际需求和兼容性选择。 描述中提到的"直接解压到maven...

    kafka集群搭建和使用Java写kafka生产者消费者

    在本文中,我们将深入探讨如何搭建Kafka集群以及如何使用Java编写Kafka的生产者和消费者。Kafka是由LinkedIn开发并贡献给Apache软件基金会的消息队列系统,它被广泛用于实时数据流处理和大数据分析。 ### Kafka集群...

    kafka连接工具客户端.rar

    3. **使用方法**:在Windows环境下,通常需要先设置环境变量,将Kafka和Java的安装路径添加到PATH中,然后解压并运行客户端工具。对于图形化工具,通常只需按照界面提示进行操作即可。例如,创建新主题时,选择相应...

    Java连接池的实现

    Java连接池是优化数据库操作的关键技术之一,它允许应用程序高效地管理和重用数据库连接,避免了频繁创建和关闭连接导致的性能开销。本篇将详细介绍如何在Java中实现一个简单的连接池,以及相关的概念和技术。 首先...

    Java实现Kafka生产者消费者代码实例

    在Java中实现Kafka的生产者和消费者,需要依赖Kafka提供的客户端库。Maven项目中通常需要在pom.xml文件中加入kafka-clients依赖,如下: ```xml &lt;groupId&gt;org.apache.kafka &lt;artifactId&gt;kafka-clients ...

Global site tag (gtag.js) - Google Analytics