`
z19910509
  • 浏览: 50440 次
  • 性别: Icon_minigender_1
  • 来自: 济南
社区版块
存档分类
最新评论

kafka java代码的使用[Producer和Consumer]

阅读更多

用java代码对kafka消息进行消费与发送,首先我们得引入相关jar包

 

maven:

 

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka_2.10</artifactId>
  <version>0.8.2.1</version>
</dependency>

 

gradle:

 

compile("org.apache.kafka:kafka_2.10:0.8.2.1")
 

 

在新版本的kafka中(具体版本记不清楚了),添加了java代码实现的producer,consumer目前还是Scala的,之前的producer和consumer均是Scala编写的,在这里则介绍java版本的producer。

 

另一点需要特别注意:

当发送消息时我们不指定key时producer将消息分发到各partition的机制是:

Scala版本的producer在你的producer启动的时候,随机获得一个partition,然后后面的消息都会发送到这个partition,也就是说,只要程序启动了,这个producer都会往同一个partition里发送消息

java版本的producer会轮询每个partition,所以发送的会比较平均

 

所以当使用Scala版本的producer时,尽量传入key,保证消息在partition的平均性

 

下面是具体的代码:

 

package cn.qlt.study.kafka;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.commons.lang.SerializationUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import cn.qlt.study.domain.User;

public class KafkaUtil {

	
	private static KafkaProducer<String, byte[]> producer=null;

	private static ConsumerConnector consumer=null;
	
	static{
		//生产者配置文件,具体配置可参考ProducerConfig类源码,或者参考官网介绍
		Map<String,Object> config=new HashMap<String, Object>();
		//kafka服务器地址
		config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.100.90:9092,192.168.100.91:9092");
		//kafka消息序列化类 即将传入对象序列化为字节数组
		config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
		//kafka消息key序列化类 若传入key的值,则根据该key的值进行hash散列计算出在哪个partition上
		config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		config.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);
		//往kafka服务器提交消息间隔时间,0则立即提交不等待
		config.put(ProducerConfig.LINGER_MS_CONFIG,0);
		//消费者配置文件
		Properties props = new Properties();
		//zookeeper地址
	    props.put("zookeeper.connect", "192.168.100.90:2181");
	    //组id
	    props.put("group.id", "123");
	    //自动提交消费情况间隔时间
	    props.put("auto.commit.interval.ms", "1000");
	    
	    ConsumerConfig consumerConfig=new ConsumerConfig(props);
		producer=new KafkaProducer<String,byte[]>(config);
		consumer=Consumer.createJavaConsumerConnector(consumerConfig);
	}

	/**
	 *启动一个消费程序 
	* @param topic 要消费的topic名称
	* @param handler 自己的处理逻辑的实现
	* @param threadCount 消费线程数,该值应小于等于partition个数,多了也没用
	 */
	public static <T extends Serializable>void startConsumer(String topic,final MqMessageHandler<T> handler,int threadCount) throws Exception{
		if(threadCount<1)
			throw new Exception("处理消息线程数最少为1");
	   //设置处理消息线程数,线程数应小于等于partition数量,若线程数大于partition数量,则多余的线程则闲置,不会进行工作
	   //key:topic名称 value:线程数
	   Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	   topicCountMap.put(topic, new Integer(threadCount));
	   Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
	   //声明一个线程池,用于消费各个partition
	   ExecutorService executor=Executors.newFixedThreadPool(threadCount);
	   //获取对应topic的消息队列
	   List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
	   //为每一个partition分配一个线程去消费
	   for (final KafkaStream stream : streams) {
		   executor.execute(new Runnable() {
			@Override
			public void run() {
			     ConsumerIterator<byte[], byte[]> it = stream.iterator();
			     //有信息则消费,无信息将会阻塞
			     while (it.hasNext()){
			    	 T message=null;
					try {
						//将字节码反序列化成相应的对象
						byte[] bytes=it.next().message();
						message = (T) SerializationUtils.deserialize(bytes);
					} catch (Exception e) {
						e.printStackTrace();
						return;
					}
			    	//调用自己的业务逻辑
			    	try {
						handler.handle(message);
					} catch (Exception e) {
						e.printStackTrace();
					}
			     }
			}
		});
       }
	}
	/**
	 *发送消息,发送的对象必须是可序列化的 
	 */
	public static Future<RecordMetadata> send(String topic,Serializable value) throws Exception{
		try {
			//将对象序列化称字节码
			byte[] bytes=SerializationUtils.serialize(value);
			Future<RecordMetadata> future=producer.send(new ProducerRecord<String,byte[]>(topic,bytes));
			return future;
		}catch(Exception e){
			throw e;
		}
	}
	
	//内部抽象类 用于实现自己的处理逻辑
	public static abstract class MqMessageHandler<T extends Serializable>{
		public abstract void handle(T message);
	}
	
	
	public static void main(String[] args) throws Exception {
		//发送一个信息
		send("test",new User("id","userName", "password"));
		//为test启动一个消费者,启动后每次有消息则打印对象信息
		KafkaUtil.startConsumer("test", new MqMessageHandler<User>() {
			@Override
			public void handle(User user) {
				//实现自己的处理逻辑,这里只打印出消息
				System.out.println(user.toString());
			}
		},2);
	}
}

 

相关配置解释:

producer:

1、producer的配置不需要zookeeper地址,会直接获取kafka的元数据,直接和broker进行通信

 

2、ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG即value.serializer,kafka生产者与broker之间数据是以byte进行传递的,所以这个参数的意思是把我们传入对象转换成byte[]的类,一般使用org.apache.kafka.common.serialization.ByteArraySerializer即可,我们自己把对象序列化为byte[]

 

3、ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG即key.serializer,首先说明下key值是干什么的,若我们指定了key的值,生产者则会根据该key进行hash散列计算出具体的partition。若不指定,则随机选择partition。一般情况下我们没必要指定该值。这个类与上面功能一样,即将key转换成byte[]

 

4、ProducerConfig.LINGER_MS_CONFIG即linger.ms,为了减少请求次数提高吞吐率,这个参数为每次提交间隔的次数,若设置了该值,如1000,则意味着我们的消息可能不会马上提交到kafka服务器,需要等上1秒中,才会进行批量提交。我们可以适当的配置该值。0为不等待立刻提交。

 

consumer:

1、zookeeper.connect:zookeeper的地址,多个之间用,分割

 

2、group.id:这个值可以随便写,但建议写点有意义的值,别随便写个123。kafka保证同一个组内的消息只会被消费一次,若需要重复消费消息,则可以配置不同的groupid。

 

3、auto.commit.interval.ms:consumer自己会记录消费的偏移量,并定时往zookeeper上提交,该值即为提交时间间隔,若该值设置太大可能会出现重复消费的情况,如我们停止了某个consumer,但该consumer还未往zookeeper提交某段时间的消费记录,这导致我们下次启动该消费者的时候,它会从上次提交的偏移量进行消费,这就导致了某些数据的重复消费。

 

注意:在杀死consumer进程后,应等一会儿再去重启,因为杀死consumer进程时,会删除zookeeper的一些临时节点,若我们马上重启的话,可能会在启动的时候那些节点还没删除掉,出现写不必要的错误

分享到:
评论

相关推荐

    kafka-java-demo 基于java的kafka生产消费者示例

    在Maven的pom.xml文件中,你需要添加Kafka的Java客户端依赖,以便项目可以编译和运行Kafka相关的代码。 【Kafka的其他特性】 除了基本的生产和消费功能,Kafka还支持一些高级特性,如幂等性生产者、事务性消费者、...

    kafka学习代码(java开发kafka)

    在" kafka-study "压缩包中,你可能会找到一些示例代码,这些代码展示了如何使用Java API创建Kafka生产者和消费者,以及如何处理错误和配置选项。通过研究这些示例,你可以更好地理解和应用Kafka的Java开发实践。...

    spring-kafka-producer-consumer-example_java_springboot_kafka_

    标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...

    kafka java 下载的jar

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)和代理(Broker)。生产者负责发布消息到主题(Topic),消费者则订阅并消费这些消息,而代理是Kafka集群中的节点,它们接收、存储和转发消息。 在Java...

    Kafka-Simple-Producer-Consumer:使用Java 8的kafka的生产者和消费者的简单变化

    在"Kafka-Simple-Producer-Consumer-master"这个项目中,可能包含了示例代码,展示了如何使用Java 8编写Kafka的生产者和消费者。这些示例通常会包含以下部分: - 生产者类(ProducerClass.java):创建并配置...

    kafka-java-demo 基于java的kafka生产消费者例子

    "Kafka-java-demo"项目是一个简单的示例,演示了如何使用Java API创建Kafka生产者和消费者。项目中通常会包含以下关键部分: - **Producer示例**:展示如何创建一个Kafka生产者,设置必要的配置(如服务器地址、...

    kafka的java依赖包

    在Java开发环境中,Kafka作为一个分布式流处理平台,被广泛用于构建实时数据管道和流应用。这个"Kafka的Java依赖包"包含了所有你需要在Java项目中与Kafka进行本地交互所需的jar包。这些jar包提供了完整的API,使得...

    kafka java小例子

    在本文中,我们将深入探讨如何使用Java来实现Apache Kafka,这是一个强大的分布式消息系统。Kafka以其高吞吐量、持久...记住,理解和掌握Kafka的核心概念以及Java API的使用,对于在大数据和实时分析领域工作至关重要。

    Kafka使用Java客户端进行访问的示例代码

    Kafka 使用 Java 客户端进行访问的示例代码 ...本文介绍了使用 Java 客户端来访问 Kafka 的示例代码,包括生产者代码和消费者代码。这些代码可以作为开发者使用 Java 客户端来访问 Kafka 的参考。

    kafka java maven例子

    在Java开发环境中,Apache Kafka是一个广泛使用的分布式流处理平台,它允许开发者构建实时数据管道和流应用程序。这个“kafka java maven例子”是针对初学者的一个很好的起点,旨在帮助理解如何使用Java和Maven来...

    kafka的java的jar包

    总之,"kafka的java的jar包"是Java开发者构建与Kafka交互应用的基础,通过理解和熟练使用这些库,可以构建出高效、可靠的实时数据处理系统。在大数据时代,掌握Kafka的Java API是成为一名合格的数据工程师所必备的...

    Springboot集成Kafka实现producer和consumer的示例代码

    Spring Boot 集成 Kafka 可以帮助开发者轻松地在应用程序中实现消息生产和消费功能,利用 Kafka 的高性能和可扩展性。Kafka 是一个分布式流处理平台,它最初由 LinkedIn 开发,后来成为 Apache 软件基金会的顶级项目...

    Kafka示例代码

    通过提供的Kafka示例代码,你可以学习如何在Java和Scala环境中设置和操作Kafka的生产者和消费者,理解Kafka与Hadoop的集成方式,以及如何在Scala这样的函数式语言中优雅地使用Kafka API。这些知识对于构建实时数据...

    Kafka中生产者和消费者java实现

    本篇文章将深入探讨如何在Java环境中使用IDEA,通过Maven构建工具来实现Kafka的生产者和消费者。 首先,我们需要设置项目环境。使用IntelliJ IDEA创建一个新的Java Maven项目,然后在pom.xml文件中添加Kafka相关的...

    kafka java依赖包

    在Java开发中,Apache Kafka是一个广泛使用的分布式流处理平台,它允许开发者构建实时数据管道和流应用程序。Kafka的核心功能包括发布订阅消息系统、高吞吐量的数据持久化以及跨多个节点的数据复制。本篇文章将深入...

    apache-kafka-1.0.0 java Demo

    在Java代码中,我们需要创建一个`KafkaProducer`实例,指定配置参数,如bootstrap servers(Kafka集群的地址)、key和value的序列化类等。然后,你可以调用`send`方法发送消息到指定的主题。 消费者API则负责接收和...

    java开发kafka-clients所需要的所有jar包以及源码

    在Java中使用Kafka-clients,你需要包含相应的jar包,这些包包含了运行和调试所需的类和方法。下面将详细介绍这个过程及其相关知识点。 1. **Kafka概述**: Apache Kafka是一个分布式流处理平台,用于构建实时数据...

    kafka基础代码实现

    Kafka的核心组件包括生产者(Producer)、消费者(Consumer)、主题(Topic)和 broker。生产者负责发布消息到主题,消费者则从主题中消费消息。每个主题可以分为多个分区(Partition),保证了数据的可扩展性和并行处理能力...

    spring-kafka源代码

    1. `org.springframework.kafka.core`:这是Spring Kafka的核心模块,包含Producer和Consumer的配置和实现。 2. `org.springframework.kafka.config`:提供了声明式配置支持,用于创建Kafka的生产者和消费者。 3. `...

    java实现flink订阅Kerberos认证的Kafka消息示例源码

    为了在Flink中安全地连接到Kafka,我们需要使用Kerberos协议,这是一个广泛采用的网络身份验证协议,可以提供互操作性和可扩展性。 1. **Kerberos基础** - Kerberos基于票证验证机制,确保只有经过身份验证的用户...

Global site tag (gtag.js) - Google Analytics