在学习笔记(一)中,讲解了kafka的安装、部署、以及bash下进行的一些简单操作,而这次将学习kafka的java客户端代码。
1、jar包。
在maven上,我们有两种apache kafka提供的jar包:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.9.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.9.0.1</version> </dependency>
这里我们选择的是kafka-clients,因为kafka-clients比kafka_2.11依赖的jar少,而且对于Consumer,没有了低级别api与高级别api的区分,方便了代码的编写。
2、Producer。
接下来我们编写Produer的java代码:
public class Producer extends Thread { private final KafkaProducer producer; public Producer(){ Properties props = new Properties(); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("bootstrap.servers", "localhost:9092"); this.producer = new KafkaProducer(props); } @Override public void run() { int messageNo = 1; while (true) { String messageStr = "Message_" + messageNo; System.out.println("Send:" + messageStr); producer.send(new ProducerRecord("my_test", messageStr)); messageNo++; try { sleep(20); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } }
在构造函数中,我们创建一个KafkaProducer的实例,props中为必要的参数(已经最少了,不能更少)。
为了一会方便运行,我们继承了Thread类,并且重写了run。在send中还能添加一个callback回调方法,可以在你的IDE中看到这个参数,如果你的业务有需要的话,可以进行定制。
对于producer参数配置,将在后续的笔记中介绍。
3、Consumer。
然后是Consumer:
public class Consumer extends Thread { private final KafkaConsumer<String, String> consumer; public Consumer (){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test-consumer-group"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer(props); } @Override public void run() { this.consumer.subscribe(Arrays.asList("my_test")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.println("receive:" + record.value()); } } } }
与Producer类似,我们继承了Thread,并且在构造函数中创建了KafkaConsumer的实例。
consumer可以通过subscribe订阅想要的topic,而poll方法能够拉取消息,参数为超时时间,单位为millisenonds,如果指定时间未拉取到消息,返回ConsumerRecords.empty()。
4、进行测试:
public class KafkaConsumerProducerDemo { public static void main(String[] args) { new Producer().start(); new Consumer().start(); } }
在控制台输出如下:
Send:Message_506
receive:Message_506
Send:Message_507
receive:Message_507
Send:Message_508
receive:Message_508
Send:Message_509
receive:Message_509
Send:Message_510
receive:Message_510
至此,java客户端kafak代码就编写完成了。需要注意的是,对于consumer,要保证消息的处理速度能够跟上producer的生产速度,可以根据业务复杂程度与可控制程度,选择合适的线程方式处理消息(例如线程池,或者是actor)。
相关推荐
本篇文章将深入探讨“java后端学习笔记”中的关键知识点,包括消息队列(Message Queuing)服务如ActiveMQ和RabbitMQ、工作流引擎Activity、容器化技术Docker、分布式服务框架Dubbo、网络库Netty、RPC(Remote ...
集合是Java编程中不可或缺的一部分,...这些学习笔记涵盖了Java开发、分布式系统、大数据处理和实时流计算等多个领域,是提升技术能力的好资源。通过深入学习和实践,你可以构建扎实的技术基础,适应不断变化的IT环境。
Java学习笔记涵盖了许多核心的IT知识点,以下是这些主题的详细说明: **JVM(Java虚拟机)** Java虚拟机是Java程序运行的基础,它负责解释和执行字节码。理解JVM的工作原理对于优化程序性能至关重要。这包括类加载...
"从0开始用Java做智慧物联网"的主题,正是引导我们学习如何利用Java来构建物联网应用。 Java在物联网中的应用主要体现在以下几个方面: 1. 设备控制:Java的API和库可以用来编写控制物联网设备的代码,如GPIO...
在“Java-Summarize-code”这个压缩包中,可能包含了对以上知识点的代码示例和学习笔记,通过阅读和实践,可以进一步巩固和深化Java核心技术的理解。无论是初学者还是经验丰富的开发者,持续学习和实践这些知识点都...
### 大数据学习笔记知识点概览 #### 第一部分:Spark学习 ##### 第1章:Spark介绍 - **1.1 Spark简介与发展** - **背景**:随着大数据处理需求的增长,传统的Hadoop MapReduce框架虽然提供了强大的计算能力,但...
SpringCloud是中国Java开发者广泛使用的微服务框架之一,它基于Spring Boot进行快速...通过学习周阳老师的SpringCloud第一季笔记,你将能够系统地理解和应用SpringCloud技术,为构建现代化的微服务架构打下坚实基础。
# SpringCloud课堂笔记知识点解析 ## 一、微服务概述 ### 1.1 微服务定义 微服务架构作为一种新兴的设计模式,旨在通过将单个应用程序分解为多个小型、独立的服务来提升软件的可扩展性和灵活性。这些服务通常遵循...
【标题】"server.rar" 涉及到的是一个基于Java技术实现的即时通讯系统...总的来说,这个项目涵盖了从网络编程、并发处理、数据库操作到用户交互等多个方面的Java技术,是一个全面学习和实践Java即时通讯应用的好案例。
这个项目的重点是作者分享他在IT领域的学习笔记,涵盖了多个关键领域。 【并发相关】这部分内容可能涉及多线程编程和并发控制。在计算机科学中,并发是指系统中多个执行单元同时进行工作的情况。这包括线程、进程...
在"stage5_micro-service"这个压缩包文件中,很可能包含了以上知识点的实践代码示例,通过学习和运行这些代码,你可以更深入地理解微服务架构的各种技术和工作原理,进一步提升你的Java微服务开发能力。
标题“cloudLearn:记录自己学习周阳老师的springCloud的历程”表明这是一个关于学习 Spring Cloud 的过程记录,可能包含了笔记、代码示例以及个人理解和实践。描述中的“springCloudLearn”进一步确认了这是围绕 ...
这个"webapp.zip"压缩包文件可能包含了一个完整的前端界面和相关的后端服务代码,用于处理文件上传功能。以下是对这个主题的详细解释: 一、Spring Boot后端处理 在Spring Boot中,我们可以使用`MultipartFile`接口...
潮乎盲盒系统是一款流行的在线盲盒购物平台的源代码,它允许开发者或者商家构建一个类似于潮流玩具、限量商品随机售卖的电商平台。源码的提供包括前端和后端两部分,意味着用户可以全面掌握系统的运行机制,并根据...