`
zk_chs
  • 浏览: 215985 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

kafka 学习笔记(二) java客户端代码

阅读更多

学习笔记(一)中,讲解了kafka的安装、部署、以及bash下进行的一些简单操作,而这次将学习kafka的java客户端代码。

 

1、jar包。

在maven上,我们有两种apache kafka提供的jar包:

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>0.9.0.1</version>
</dependency>
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka_2.11</artifactId>
	<version>0.9.0.1</version>
</dependency>

这里我们选择的是kafka-clients,因为kafka-clients比kafka_2.11依赖的jar少,而且对于Consumer,没有了低级别api与高级别api的区分,方便了代码的编写。

 

2、Producer。

接下来我们编写Produer的java代码:

public class Producer extends Thread {

    private final KafkaProducer producer;

    public Producer(){
        Properties props = new Properties();
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("bootstrap.servers", "localhost:9092");
        this.producer = new KafkaProducer(props);
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = "Message_" + messageNo;
            System.out.println("Send:" + messageStr);
            producer.send(new ProducerRecord("my_test", messageStr));
            messageNo++;
            try {
                sleep(20);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
}

在构造函数中,我们创建一个KafkaProducer的实例,props中为必要的参数(已经最少了,不能更少)。

为了一会方便运行,我们继承了Thread类,并且重写了run。在send中还能添加一个callback回调方法,可以在你的IDE中看到这个参数,如果你的业务有需要的话,可以进行定制。

 

对于producer参数配置,将在后续的笔记中介绍。

 

3、Consumer。

然后是Consumer:

public class Consumer extends Thread {

    private final KafkaConsumer<String, String> consumer;

    public Consumer (){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-consumer-group");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer(props);
    }

    @Override
    public void run() {
        this.consumer.subscribe(Arrays.asList("my_test"));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records){
                System.out.println("receive:" + record.value());
            }
        }
    }
}

与Producer类似,我们继承了Thread,并且在构造函数中创建了KafkaConsumer的实例。

consumer可以通过subscribe订阅想要的topic,而poll方法能够拉取消息,参数为超时时间,单位为millisenonds,如果指定时间未拉取到消息,返回ConsumerRecords.empty()。

 

4、进行测试:

public class KafkaConsumerProducerDemo {

    public static void main(String[] args) {
        new Producer().start();
        new Consumer().start();
    }

}

 

在控制台输出如下:

Send:Message_506
receive:Message_506
Send:Message_507
receive:Message_507
Send:Message_508
receive:Message_508
Send:Message_509
receive:Message_509
Send:Message_510
receive:Message_510

 

至此,java客户端kafak代码就编写完成了。需要注意的是,对于consumer,要保证消息的处理速度能够跟上producer的生产速度,可以根据业务复杂程度与可控制程度,选择合适的线程方式处理消息(例如线程池,或者是actor)。

 

0
5
分享到:
评论

相关推荐

    java后端学习笔记

    本篇文章将深入探讨“java后端学习笔记”中的关键知识点,包括消息队列(Message Queuing)服务如ActiveMQ和RabbitMQ、工作流引擎Activity、容器化技术Docker、分布式服务框架Dubbo、网络库Netty、RPC(Remote ...

    集合、NIO、Netty、Thread、MySql、Hive、HBase、Kafka、Spark、Fink等学习笔记.zip

    集合是Java编程中不可或缺的一部分,...这些学习笔记涵盖了Java开发、分布式系统、大数据处理和实时流计算等多个领域,是提升技术能力的好资源。通过深入学习和实践,你可以构建扎实的技术基础,适应不断变化的IT环境。

    java 学习笔记包括 JVM 集合 Spring ,Spring cloud, Mysql ,redis ,kaf.zip

    Java学习笔记涵盖了许多核心的IT知识点,以下是这些主题的详细说明: **JVM(Java虚拟机)** Java虚拟机是Java程序运行的基础,它负责解释和执行字节码。理解JVM的工作原理对于优化程序性能至关重要。这包括类加载...

    从0开始用Java做智慧物联网.zip

    "从0开始用Java做智慧物联网"的主题,正是引导我们学习如何利用Java来构建物联网应用。 Java在物联网中的应用主要体现在以下几个方面: 1. 设备控制:Java的API和库可以用来编写控制物联网设备的代码,如GPIO...

    Java核心知识点记录学习

    在“Java-Summarize-code”这个压缩包中,可能包含了对以上知识点的代码示例和学习笔记,通过阅读和实践,可以进一步巩固和深化Java核心技术的理解。无论是初学者还是经验丰富的开发者,持续学习和实践这些知识点都...

    大数据学习笔记

    ### 大数据学习笔记知识点概览 #### 第一部分:Spark学习 ##### 第1章:Spark介绍 - **1.1 Spark简介与发展** - **背景**:随着大数据处理需求的增长,传统的Hadoop MapReduce框架虽然提供了强大的计算能力,但...

    尚硅谷周阳SpringCloud第一季笔记(超详细非官方手工笔记)

    SpringCloud是中国Java开发者广泛使用的微服务框架之一,它基于Spring Boot进行快速...通过学习周阳老师的SpringCloud第一季笔记,你将能够系统地理解和应用SpringCloud技术,为构建现代化的微服务架构打下坚实基础。

    周阳SpringCloud课堂笔记

    # SpringCloud课堂笔记知识点解析 ## 一、微服务概述 ### 1.1 微服务定义 微服务架构作为一种新兴的设计模式,旨在通过将单个应用程序分解为多个小型、独立的服务来提升软件的可扩展性和灵活性。这些服务通常遵循...

    server.rar_ICQ/即时通讯_Java_

    【标题】"server.rar" 涉及到的是一个基于Java技术实现的即时通讯系统...总的来说,这个项目涵盖了从网络编程、并发处理、数据库操作到用户交互等多个方面的Java技术,是一个全面学习和实践Java即时通讯应用的好案例。

    托马斯·T1an.github.io

    这个项目的重点是作者分享他在IT领域的学习笔记,涵盖了多个关键领域。 【并发相关】这部分内容可能涉及多线程编程和并发控制。在计算机科学中,并发是指系统中多个执行单元同时进行工作的情况。这包括线程、进程...

    micro service

    在"stage5_micro-service"这个压缩包文件中,很可能包含了以上知识点的实践代码示例,通过学习和运行这些代码,你可以更深入地理解微服务架构的各种技术和工作原理,进一步提升你的Java微服务开发能力。

    cloudLearn:记录自己学习周阳老师的springCloud的历程

    标题“cloudLearn:记录自己学习周阳老师的springCloud的历程”表明这是一个关于学习 Spring Cloud 的过程记录,可能包含了笔记、代码示例以及个人理解和实践。描述中的“springCloudLearn”进一步确认了这是围绕 ...

    webapp.zip(使用地点是springboot——跨服务器文件上传)

    这个"webapp.zip"压缩包文件可能包含了一个完整的前端界面和相关的后端服务代码,用于处理文件上传功能。以下是对这个主题的详细解释: 一、Spring Boot后端处理 在Spring Boot中,我们可以使用`MultipartFile`接口...

    最新潮乎盲盒系统源码 附搭建教程.zip

    潮乎盲盒系统是一款流行的在线盲盒购物平台的源代码,它允许开发者或者商家构建一个类似于潮流玩具、限量商品随机售卖的电商平台。源码的提供包括前端和后端两部分,意味着用户可以全面掌握系统的运行机制,并根据...

Global site tag (gtag.js) - Google Analytics