- 浏览: 315716 次
- 性别:
- 来自: 上海
文章分类
- 全部博客 (192)
- Java基础 (36)
- jsp jstl el (0)
- sql (8)
- java--hibernate (3)
- dorado dataset (2)
- svn (3)
- java--知识点 utc 转 date (1)
- java--小功能 (5)
- Java--基础理论 (1)
- js--技术点 (4)
- js--基础理论 (3)
- java--dorado服务端 (1)
- javaee--jstl (1)
- dorado - -客户端 (1)
- struts1 (1)
- ant (1)
- smarty (1)
- log4j (2)
- tomcat (4)
- mysql (2)
- Axis (2)
- oracle (3)
- js 静态分页 (1)
- sql function (2)
- microsoft sql server (1)
- linux vi (1)
- LinkedHashMap (1)
- java-xml (2)
- Java基础,java面试题 (1)
- java面试题 (2)
- sql复制表 (1)
- jdbc-maven (1)
- linux 查看并发数 (2)
- json (1)
- linux (5)
- mybatis (5)
- 计算机基础 (1)
- Collection (0)
- Java.util (1)
- Java (0)
- Java Web (1)
- 大数据,flume (1)
- kafka (3)
- storm (1)
- mongodb (1)
- spring (12)
- ibatis 批量插入 (1)
- eclipse 模板配置教程 (1)
- csv文件生成工具类 (1)
- jetty (1)
- 多线程 (1)
- rabbitmq (1)
- git (1)
- dubbo (2)
- spring boot (1)
- Diamond 配置生成 (1)
- Elasticsearch (1)
- common.io (1)
- commons (2)
- ibatis|mybatis (1)
最新评论
-
NEOGX:
http://www.tuicool.com/articles ...
Json转换利器Gson之实例一-简单对象转化和带泛型的List转化 -
么可k:
可以喔
Can not find the tag library descriptor for "http://java.sun.com/j... -
tianyi_qingwu:
解决了我的一个问题,学习了,谢谢!
Ant+Flex: Java Heap Space -
ldci3gandroid:
if(month==0){ year-=1;mont ...
java获取当前日期一个月后的日期 -
djcbpl:
好像不行啊!我放进去了,还是错的啊
Can not find the tag library descriptor for "http://java.sun.com/j...
1.引用POM:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
2,消费者配置-spring-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.10.240:9092" />
<entry key="group.id" value="test" />
<entry key="enable.auto.commit" value="false" />
<entry key="retries" value="10"/>
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="session.timeout.ms" value="15000" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<bean id="concurrentKafkaListenerContainerFactory"
class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
<property name="consumerFactory" ref="consumerFactory" />
<property name="concurrency" value="3" />
<!-- <property name="containerProperties.pollTimeout" value="3000"/> -->
</bean>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!-- 重要!配置topic -->
<constructor-arg value="my-topic"/>
<!--<property name="ackOnError" value="false"/>-->
<property name="messageListener" ref="messageListernerConsumerService"/>
<property name="errorHandler" ref="messageListernerConsumerService" />
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
<!--如果需要失败重试就需要配置-->
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.shenma.paulfrank.kafka.KafkaConsumerListener">
<property name="taskExecutorUtil" ref="taskExecutorUtil" />
</bean>
<!-- 异步线程池 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="20" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="100" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="1000" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="300" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<bean id="taskExecutorUtil" class="com.shenma.paulfrank.kafka.TaskExecutorUtil">
<!-- <constructor-arg ref="taskExecutor" /> -->
<property name="taskExecutor" ref="taskExecutor" />
</bean>
</beans>
3,生产者配置 - spring-kafka-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${com.shenma.mq.address}" />
<entry key="group.id" value="0" />
<entry key="retries" value="10" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.IntegerSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="test1" />
</bean>
</beans>
4,发送案例:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(){
kafkaTemplate.send("my-topic", "hello");
}
5,消费案例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MsgConsumer {
@KafkaListener(topics = { "my-topic" })
public void processMessage2(String content) {
System.out.println("test2:"+content);
}
}
地址:
开发环境:
kafka开发环境服务地址:192.168.10.240:9092
kafka监控:192.168.10.240:9000
sit环境
172.16.10.44:9092
生产环境:
10.10.212.27:9092,10.10.228.81:9092,10.10.234.137:9092,10.10.163.126:9092
http://www.cnblogs.com/luotianshuai/p/5206662.html kafka安装
kafka版本:0.11.0
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
2,消费者配置-spring-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="192.168.10.240:9092" />
<entry key="group.id" value="test" />
<entry key="enable.auto.commit" value="false" />
<entry key="retries" value="10"/>
<entry key="auto.commit.interval.ms" value="1000" />
<entry key="session.timeout.ms" value="15000" />
<entry key="key.deserializer"
value="org.apache.kafka.common.serialization.IntegerDeserializer" />
<entry key="value.deserializer"
value="org.apache.kafka.common.serialization.StringDeserializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建consumerFactory bean -->
<bean id="consumerFactory"
class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties" />
</constructor-arg>
</bean>
<bean id="concurrentKafkaListenerContainerFactory"
class="org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory">
<property name="consumerFactory" ref="consumerFactory" />
<property name="concurrency" value="3" />
<!-- <property name="containerProperties.pollTimeout" value="3000"/> -->
</bean>
<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<!-- 重要!配置topic -->
<constructor-arg value="my-topic"/>
<!--<property name="ackOnError" value="false"/>-->
<property name="messageListener" ref="messageListernerConsumerService"/>
<property name="errorHandler" ref="messageListernerConsumerService" />
</bean>
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
</bean>
<!--如果需要失败重试就需要配置-->
<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.shenma.paulfrank.kafka.KafkaConsumerListener">
<property name="taskExecutorUtil" ref="taskExecutorUtil" />
</bean>
<!-- 异步线程池 -->
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<!-- 核心线程数 -->
<property name="corePoolSize" value="20" />
<!-- 最大线程数 -->
<property name="maxPoolSize" value="100" />
<!-- 队列最大长度 >=mainExecutor.maxSize -->
<property name="queueCapacity" value="1000" />
<!-- 线程池维护线程所允许的空闲时间 -->
<property name="keepAliveSeconds" value="300" />
<!-- 线程池对拒绝任务(无线程可用)的处理策略 -->
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
</property>
</bean>
<bean id="taskExecutorUtil" class="com.shenma.paulfrank.kafka.TaskExecutorUtil">
<!-- <constructor-arg ref="taskExecutor" /> -->
<property name="taskExecutor" ref="taskExecutor" />
</bean>
</beans>
3,生产者配置 - spring-kafka-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${com.shenma.mq.address}" />
<entry key="group.id" value="0" />
<entry key="retries" value="10" />
<entry key="batch.size" value="16384" />
<entry key="linger.ms" value="1" />
<entry key="buffer.memory" value="33554432" />
<entry key="key.serializer"
value="org.apache.kafka.common.serialization.IntegerSerializer" />
<entry key="value.serializer"
value="org.apache.kafka.common.serialization.StringSerializer" />
</map>
</constructor-arg>
</bean>
<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties" />
</constructor-arg>
</bean>
<!-- 创建kafkatemplate bean,使用的时候,只需要注入这个bean,即可使用template的send消息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory" />
<constructor-arg name="autoFlush" value="true" />
<property name="defaultTopic" value="test1" />
</bean>
</beans>
4,发送案例:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(){
kafkaTemplate.send("my-topic", "hello");
}
5,消费案例:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class MsgConsumer {
@KafkaListener(topics = { "my-topic" })
public void processMessage2(String content) {
System.out.println("test2:"+content);
}
}
地址:
开发环境:
kafka开发环境服务地址:192.168.10.240:9092
kafka监控:192.168.10.240:9000
sit环境
172.16.10.44:9092
生产环境:
10.10.212.27:9092,10.10.228.81:9092,10.10.234.137:9092,10.10.163.126:9092
http://www.cnblogs.com/luotianshuai/p/5206662.html kafka安装
kafka版本:0.11.0
- spring-kafka-producer.rar (868 Bytes)
- 下载次数: 8
- spring-kafka-consumer.rar (1.3 KB)
- 下载次数: 6
发表评论
-
基于注解 spring aop 示例
2019-03-13 17:19 615import org.springframework.we ... -
spring跨域设置
2018-06-07 15:50 374import org.springframework.ster ... -
spring中scope作用域(转)
2018-04-02 21:08 364今天研究了一下scope的作用域。默认是单例模式,即scope ... -
Spring IOC好处
2018-03-27 20:55 601IOC:控制反转,它是不 ... -
Spring Boot 入门
2017-06-16 17:38 0spring Boot是Spring社区较新的一个项目 ... -
kafka.common.ConsumerRebalanceFailedException
2017-05-31 15:24 632kafka.common.ConsumerRebalance ... -
spring 处理excel 工具类
2016-10-13 11:57 585package com.mobanker.tkj.cw.web ... -
spring 测试对外配置文件
2016-10-09 15:45 527ClassPathXmlApplicationConte ... -
spring mvc 详解
2016-08-19 17:20 5332.1、Spring Web MVC是什么 Spring ... -
spring mvc 静态资源访问
2016-08-19 17:10 657spring mvc 的<mvc;resource ... -
Spring junit 整合
2016-06-27 18:20 822package com.mobanker.tkj.cw.t ... -
Spring 线程池配置,更好实践
2016-06-27 17:07 593<bean id="threadP ... -
Spring集成Quartz定时任务框架介绍和Cron表达式详解
2015-07-27 19:08 514在JavaEE系统中,我们会经常用到定时任务,比如每天 ... -
flume-ng+Kafka+Storm+HDFS 实时系统组合
2015-04-01 18:58 928个人观点:大数据我们都知道hadoop,但并不都是hadoo ...
相关推荐
从这些内容来看,spring-kafka整合官方文档提供了全面的信息,涵盖了从基础的Kafka配置和消息处理,到复杂的事务管理和错误处理机制,再到Kafka Streams的支持。它旨在帮助开发者利用Spring的强大功能,简化Kafka在...
在Spring Cloud框架中,整合RabbitMQ或Kafka作为消息驱动是常见的微服务间通信方式。这两种技术都是流行的消息中间件,用于实现异步处理、解耦和扩展性。下面将详细阐述它们在Spring Cloud中的应用。 首先,...
Spring for Apache Kafka整合概述 Spring for Apache Kafka是Apache Kafka的一个Spring整合实现,旨在提供一个简洁、灵活的消息队列解决方案。下面是Spring for Apache Kafka的概述和关键特性: 概述 Spring for ...
spring整合kafka集群,init.properties配置kafka集群信息(也可以单个kafka服务),kafka-consumer.xml配置消费者监听,kafka-producer.xml配置消息生产者。
在本文中,我们将深入探讨如何将Spring框架与Apache Kafka集成,以便实现在Spring应用中发送和接收消息。Kafka是一个高吞吐量、分布式的发布/订阅消息系统,而Spring框架是Java开发中最广泛使用的应用框架之一。通过...
Spring Kafka 是一个由 Spring 社区开发的库,它为 Apache Kafka 提供了轻量级的、基于 Spring 的整合框架。Kafka 是一个分布式流处理平台,常用于构建实时数据管道和流应用。Spring Kafka 的设计目标是简化与 Kafka...
### Kafka环境搭建与Spring整合详解 #### 一、Kafka基本概念 Kafka是一款开源的分布式消息系统,它能够提供高吞吐量的数据管道和存储服务。为了更好地理解和使用Kafka,我们首先需要了解以下几个核心概念: 1. **...
当我们谈论"基于xml方式的Spring整合Kafka"时,意味着我们将使用XML配置文件来设置Spring与Kafka的集成。 Spring框架允许开发者通过XML配置来声明式地配置bean,包括数据源、事务管理器、以及与Kafka相关的消费者和...
在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本中集成Apache Kafka,以便在微服务架构中实现高效的数据流处理。首先,让我们理解Spring Boot和Kafka的基本概念,然后逐步介绍如何配置和使用它们。 **Spring ...
总结,SpringBoot整合Kafka可以简化消息中间件的使用,通过Spring提供的API和注解,我们可以轻松地实现消息的生产和消费,同时支持自定义分区策略。这使得在实际项目中处理实时数据流变得更加便捷。
Spring整合Kafka是一个常见的任务,尤其对于构建大数据流处理或者实时消息传递的系统至关重要。Spring框架提供了Spring Kafka模块,使得开发者能够轻松地在Java应用中集成Apache Kafka。在这个"Spring整合Kafka的...
在本文中,我们将深入探讨如何在Spring Boot 2.7.3版本的项目中整合ELK(Elasticsearch、Logstash、Kafka)堆栈,以便实现高效且可扩展的日志管理和分析。ELK组合提供了实时日志收集、处理和搜索的能力,而Kafka作为...
在Spring框架中整合Apache Kafka,可以使用注解或者XML配置的方式。这两种方法都允许开发者方便地将消息生产者和消费者集成到Spring应用中。这里我们将详细介绍这两种方式。 ### 一、注解方式 #### 1. 添加依赖 ...
《Spring Boot 整合 Apache Kafka》博文对应的源代码;博文地址:https://blog.csdn.net/shawearn1027/article/details/107067328
在本文中,我们将深入探讨如何使用Spring Boot整合Apache Kafka,以构建一个生产者和消费者的示例。Apache Kafka是一个分布式流处理平台,常被用于构建实时数据管道和流应用。Spring Boot简化了Java开发,提供了开箱...
在本文中,我们将深入探讨如何将Spring Boot与Apache Kafka整合,以便实现发布/消费消息队列模式。Apache Kafka是一款高效、可扩展且分布式的消息中间件,而Spring Boot则是一个简化了Spring应用程序开发的框架。...
Spring Boot 整合 Spring-Kafka 实现发送接收消息实例代码 Spring Boot 是一个基于 Java 的开源框架,它提供了一个灵活的方式来构建 Web 应用程序。Spring-Kafka 是一个基于 Apache Kafka 的消息队列组件,提供了...