`

kafka环境搭建与实战(2)kafka API实战

阅读更多
kafka环境搭建与实战(1)安装kafka           http://zilongzilong.iteye.com/blog/2267913
kafka环境搭建与实战(2)kafka API实战       http://zilongzilong.iteye.com/blog/2267924

1.maven项目中添加依赖

 

		<!-- kafka -->
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-clients</artifactId>
			<version>0.9.0.0</version>
		</dependency>

 

2.spring集成kafka

与spring集成spring-kafka.xml

 

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans"
	xsi:schemaLocation="
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd">

	<bean id="testProducer" class="org.apache.kafka.clients.producer.KafkaProducer" scope="prototype">
		<constructor-arg type="java.util.Properties">
			<props>
				<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2</prop>
				<prop key="acks">all</prop>
				<prop key="retries">0</prop>
				<prop key="batch.size">16384</prop>
				<prop key="linger.ms">1</prop>
				<prop key="buffer.memory">33554432</prop>
				<prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
				<prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer</prop>
				<prop key="partitioner.class">com.***.kafka.Partitioner.RandomPartitioner</prop>
			</props>
		</constructor-arg>
	</bean>

	<bean id="group1Consumer" class="org.apache.kafka.clients.consumer.KafkaConsumer" scope="prototype">
		<constructor-arg type="java.util.Properties">
			<props>
				<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2:9092</prop>
				<prop key="group.id">group1</prop>
				<prop key="enable.auto.commit">true</prop>
				<prop key="auto.commit.interval.ms">1000</prop>
				<prop key="session.timeout.ms">30000</prop>
				<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
				<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
			</props>
		</constructor-arg>
	</bean>

	<bean id="group2Consumer" class="org.apache.kafka.clients.consumer.KafkaConsumer" scope="prototype">
		<constructor-arg type="java.util.Properties">
			<props>
				<prop key="bootstrap.servers">kafka0:9092,kafka1:9092,kafka2:9092</prop>
				<prop key="group.id">group2</prop>
				<prop key="enable.auto.commit">true</prop>
				<prop key="auto.commit.interval.ms">1000</prop>
				<prop key="session.timeout.ms">30000</prop>
				<prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
				<prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</prop>
			</props>
		</constructor-arg>
	</bean>
</beans>

 

3.producer使用

自己的partion策略类

public class RandomPartitioner implements Partitioner {
	@Override
	public void configure(Map<String, ?> configs) {

	}

	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		Random random = new Random();
		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		int numPartitions = partitions.size();
		if (numPartitions > 0) {
			return Math.abs(random.nextInt()) % numPartitions;
		} else {
			return 0;
		}
	}

	@Override
	public void close() {

	}
}

 

public class ProducerUtil {
	private static Producer<String, String> producer = SpringContextHolder.getBean("testProducer");
	public static void produce(String message) {
		producer.send(new ProducerRecord<String, String>("test",message));
	}
}

 

4.consumer使用

public class KafkaServletContextListener implements ServletContextListener {
	@Override
	public void contextInitialized(ServletContextEvent sce) {
		ExecutorService executor = Executors.newFixedThreadPool(2);
		executor.execute(new Runnable() {
			@Override
			public void run() {
				KafkaConsumer<String, String> consumer = SpringContextHolder.getBean("group1Consumer");
				consumer.subscribe(Arrays.asList("test"));
				while (true) {
					ConsumerRecords<String, String> records = consumer.poll(100);
					for (ConsumerRecord<String, String> record : records) {
						record.key();
						record.offset();
						record.partition();
						record.topic();
						record.value();
						//TODO
					}
				}
			}
		});
		executor.execute(new Runnable() {
			@Override
			public void run() {
				KafkaConsumer<String, String> consumer = SpringContextHolder.getBean("group2Consumer");
				consumer.subscribe(Arrays.asList("test"));
				while (true) {
					ConsumerRecords<String, String> records = consumer.poll(100);
					for (ConsumerRecord<String, String> record : records) {
						record.key();
						record.offset();
						record.partition();
						record.topic();
						record.value();
						//TODO
					}
				}
			}
		});
	}

	@Override
	public void contextDestroyed(ServletContextEvent sce) {
		
	}
}

 

   在web.xml中添加listener

   

	<listener>
		<listener-class>com.***.web.listener.KafkaServletContextListener</listener-class>
	</listener>

 

5.kafka中遇到的错误

    5.1 消费者在消费消息的时候,一直报如下错误:

ILLEGAL_GENERATION occurred while committing offsets for group

    在网上找到一篇文章http://comments.gmane.org/gmane.comp.apache.kafka.user/10708,但是按照这个调整了auto.commit.interval.ms和session.timeout.ms,但是还是无济于事。

    最后的根本解决办法是,优化消费者的处理逻辑,因为我在消费这种用到了jredis,jredis中对于exist、get时间复杂度为o(1),而smembers方法时间复杂度为o(N),我做的是一是优化代码,二是尽量优先用jredis中对于exist、get方法,然后部署上去,问题经观察,没有出现了。

    总之,解决办法是优化消费者代码,减少耗时,让消费者能及时反馈消费状态给zookeeper

 

 

 

 

 

 

 

 

 

 

分享到:
评论

相关推荐

    Kafka源码解析与实战

    ### Kafka源码解析与实战 #### 一、Kafka简介 Kafka是由Apache软件基金会开发的一款开源流处理平台,主要用于构建实时数据管道以及基于流的数据处理应用。它以分布式的方式运行,具有高吞吐量、低延迟的特点,适用...

    Kafka急速入门与实战.doc

    2. **Kafka急速入门与实战** - **Springboot整合Kafka**:Springboot简化了Kafka的集成。通过添加相关依赖,可以轻松创建生产者和消费者。生产者负责发布消息,而消费者负责接收和处理消息。在实际测试中,可以模拟...

    KAFKA分布式消息系统(window)

    **KAFKA分布式消息系统在Windows环境下的搭建与应用** KAFKA是一个高吞吐量的分布式消息系统,由LinkedIn开发并开源,现在是Apache软件基金会的顶级项目。它主要设计用于处理实时流数据,允许应用程序发布和订阅...

    kafka权威指南和源码解析实战

    2. **Kafka安装与配置**:介绍如何搭建Kafka集群,包括服务器配置、Zookeeper集成等。 3. **Kafka API使用**:讲解Java和Scala客户端如何发布和消费消息,以及如何使用命令行工具操作Kafka。 4. **Kafka性能优化**:...

    kafka实战演练快速上手

    1. **搭建环境**:首先在本地或者虚拟机上部署一套Kafka集群。 2. **编写生产者程序**:创建一个简单的Java应用,向`kafka-demo`主题发送消息。 3. **编写消费者程序**:创建另一个Java应用,订阅`kafka-demo`主题...

    kafka 权威指南 2018年1月出版 完整版

    接下来,指南涵盖了安装和配置Kafka的步骤,包括集群的搭建、配置参数的调整以及与Zookeeper的集成。这部分内容对于实际操作Kafka至关重要,帮助读者理解如何在生产环境中部署和管理Kafka集群。 书中还详细介绍了...

    Storm流计算项目:1号店电商实时数据分析系统-08.storm-kafka 详解和实战案例.pptx

    《Storm流计算项目:1号店电商实时数据分析系统——storm-kafka详解与实战案例》 在大数据处理领域,实时计算已经成为不可或缺的一部分,特别是在电商行业中,实时数据分析能够帮助企业快速响应市场变化,提高运营...

    Kafka多维度系统精讲,从入门到熟练掌握

    Kafka多维度系统精讲,从入门到...本课从搭建开始,全面剖析Kafka,解读和使用核心API,将底层实现和设计原理融合贯穿,同时结合案例,把原理落地。更有凝结老师心血的Kafka最佳配置方式推荐,以及面试常问知识点梳理。

    spring-kafka文档.zip

    2. ConsumerFactory与ProducerFactory:这两个工厂类用于创建Kafka消费者和生产者的实例,它们可以根据配置动态地生成连接到Kafka集群的客户端。 3. Container配置:MessageListenerContainer是Spring Kafka处理...

    kafka视频+文档资料.rar

    1. **安装与配置**:如何在本地或集群环境中搭建Kafka,并进行基本的配置。 2. **API使用**:展示Java、Python等语言的Kafka API,如何创建生产者和消费者,发送与接收消息。 3. **实战案例**:通过具体的项目或...

    企业大数据处理:Spark、Druid、Flume与Kafka应用实践(超清完整版).pdf

    例如,书中可能会介绍如何利用Spark进行大规模数据的并行处理,如何使用Druid实现实时数据查询和分析,以及如何借助Flume和Kafka搭建可靠高效的数据流管道等。通过这些实战案例的学习,读者不仅可以掌握核心技术,还...

    Hadoop+Spark生态系统操作与实战指南.epub

    第2部分(第8~11章)讲解Spark的原生态组件,包括SparkCore、SparkSQL、SparkStreaming、DataFrame,以及介绍Scala、SparkAPI、SparkSQL、SparkStreaming、DataFrame原理和CDH版本环境下实战操作,其中Flume和Kafka...

    Storm流计算项目:1号店电商实时数据分析系统-05.Kafka基础知识和集群搭建.pptx

    01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java API 简单开发...

    spring-boot实战PDF 完整版和随书源码.7z

    7. 身份验证与令牌管理:实现OAuth2或JWT(JSON Web Tokens)的身份验证机制。 通过阅读这本书和实践源码,开发者不仅能了解Spring Boot的基本用法,还能掌握其高级特性和最佳实践,从而提升开发效率,打造高质量的...

    基于Storm流计算天猫双十一作战室项目实战

    - **CDH5生态环境**:课程指导学员搭建基于Cloudera CDH5的完整生态环境,并通过Cloudera Manager进行界面化的管理操作,极大地简化了Hadoop平台的部署和维护工作。 **5. 经验分享与优化建议** - **项目架构设计**...

    Storm流计算之项目篇-项目实战:基于1号店的电商实时数据分析系统 共41个章节 含全部PPT课件和源代码.rar

    01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java API 简单开发...

    Storm流计算项目:1号店电商实时数据分析系统-06.Kafka基本操作和最优设置.pptx

    01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java API 简单开发...

    1号店电商实时数据分析系统-14.项目1-地区销售额-Spout融合Kafka Consumer及线程安全测试.pptx

    01.Storm项目实战课程大纲02.CDH5搭建之CM5安装部署03.CDH5搭建和CM界面化集群管理04.Hadoop、HBase、Zookeeper集群管理和角色分配05.Kafka基础知识和集群搭建06.Kafka基本操作和最优设置07.Kafka Java API 简单开发...

    大数据工程师 Flink技术与实战 源码笔记下载

    1. **Flink安装与环境配置**:如何搭建本地或分布式Flink集群,配置参数优化等。 2. **基本概念和架构**:介绍Flink的基本概念,如Job、Task、Operator、DataStream,以及Flink的运行架构。 3. **Flink程序开发**...

    基于 Springboot + Redis + Kafka 的秒杀系统.zip

    综上所述,本毕业设计项目通过整合Spring Boot、Redis和Kafka等技术,实现了高效、稳定的秒杀系统,为理解和实践大型分布式系统提供了实战案例。在实际应用中,还需要考虑更多的细节,如安全性、可扩展性、容错性等...

Global site tag (gtag.js) - Google Analytics