kafka spring demo
下载:http://download.csdn.net/download/knight_black_bob/9709057
安装详解 :http://knight-black-bob.iteye.com/blog/2343192
使用定时器发送后 结果如下
kafka 代码安装
15.安装kafka cd /usr/local/ wget http://mirror.bit.edu.cn/apache/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz tar xf kafka_2.10-0.10.0.0.tgz ln -s /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R hdfs:hadoop /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka chown -R root:root /usr/local/kafka_2.10-0.10.0.0 /usr/local/kafka /usr/local/zookeeper/bin/zkCli.sh create /kafka '' vim /usr/local/kafka/config/server.properties broker.id=0 zookeeper.connect=dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev05.aoiplus.openpf:/usr/local/ scp -r /usr/local/kafka_2.10-0.10.0.0.tgz root@dev06.aoiplus.openpf:/usr/local/ scp -r /usr/local/kafka/config/server.properties root@dev05.aoiplus.openpf:/usr/local/kafka/config/server.properties scp -r /usr/local/kafka/config/server.properties root@dev06.aoiplus.openpf:/usr/local/kafka/config/server.properties master slave 启动 /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties & 创建topic /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --replication-factor 3 --partitions 5 --topic baoy-topic /usr/local/kafka/bin/kafka-topics.sh --describe --zookeeper dev10.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181/kafka --topic baoy-topic /usr/local/kafka/bin/kafka-console-producer.sh --broker-list dev10.aoiplus.openpf:9092,dev05.aoiplus.openpf:9092,dev06.aoiplus.openpf:9092 --topic baoy-topic /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper dev10.aoiplus.openpf:2181,dev05.aoiplus.openpf:2181,dev06.aoiplus.openpf:2181/kafka --from-beginning --topic baoy-topic
安装完成 后测试
productor
consumer
spring 接受信息
代码部分
applicationContext-kafka-productor.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- commons config --> <bean id="stringSerializer" class="org.apache.kafka.common.serialization.StringSerializer" /> <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroReflectDatumBackedKafkaEncoder"> <constructor-arg value="java.lang.String" /> </bean> <bean id="producerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="topic.metadata.refresh.interval.ms">3600000</prop> <prop key="message.send.max.retries">5</prop> <prop key="serializer.class">kafka.serializer.StringEncoder</prop> <prop key="request.required.acks">1</prop> </props> </property> </bean> <!-- topic test config --> <int:channel id="pChannel"> <int:queue /> </int:channel> <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapterProductor" kafka-producer-context-ref="producerContext" auto-startup="true" channel="pChannel" order="3"> <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="1" task-executor="taskProductorExecutor" /> </int-kafka:outbound-channel-adapter> <task:executor id="taskProductorExecutor" pool-size="5" keep-alive="120" queue-capacity="500" /> <int-kafka:producer-context id="producerContext" producer-properties="producerProperties"> <int-kafka:producer-configurations> <int-kafka:producer-configuration broker-list="172.23.27.120:9092,172.23.27.115:9092,172.23.27.116:9092" key-serializer="stringSerializer" value-class-type="java.lang.String" value-serializer="stringSerializer" topic="baoy-topic" /> </int-kafka:producer-configurations> </int-kafka:producer-context> </beans>
applicationContext-kafka-consumer.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd"> <!-- topic test conf --> <int:channel id="cChannel"> <int:dispatcher task-executor="kafkaMessageExecutor" /> </int:channel> <!-- zookeeper配置 可以配置多个 --> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> <!-- channel配置 auto-startup="true" 否则接收不发数据 --> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="true" channel="cChannel"> <int:poller fixed-delay="1" time-unit="MILLISECONDS" /> </int-kafka:inbound-channel-adapter> <task:executor id="kafkaMessageExecutor" pool-size="8" keep-alive="120" queue-capacity="500" /> <bean id="kafkaDecoder" class="org.springframework.integration.kafka.serializer.common.StringDecoder" /> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> </props> </property> </bean> <!-- 消息接收的BEEN --> <bean id="kafkaConsumerService" class="com.curiousby.baoy.cn.kafka.KafkaConsumerService" /> <!-- 指定接收的方法 --> <int:outbound-channel-adapter channel="cChannel" ref="kafkaConsumerService" method="process" /> <int-kafka:consumer-context id="consumerContext" consumer-timeout="1000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" value-decoder="kafkaDecoder" key-decoder="kafkaDecoder" max-messages="5000"> <int-kafka:topic id="baoy-topic" streams="5" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> </beans>
KafkaConsumerService
@Service public class KafkaConsumerService { public void process(Map<String, Map<Integer, String>> msgs) { for (Map.Entry<String, Map<Integer, String>> entry : msgs.entrySet()) { System.out.println("======================================Consumer Message received: "); System.out.println("=====================================Suchit Topic:" + entry.getKey()); for (String msg : entry.getValue().values()) { System.out.println("================================Suchit Consumed Message: " + msg); } } } }
KafkaProductorService
@Service public class KafkaProductorService { @Autowired @Qualifier("pChannel") private MessageChannel messageChannel; public void sendInfo(String topic, Object obj) { System.out.println("---Service:KafkaService------sendInfo------"); messageChannel.send(MessageBuilder.withPayload(obj).setHeader(KafkaHeaders.TOPIC,topic).build()); } }
pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.curiousby.baoyou.cn</groupId> <artifactId>SpringKafkaDEMO</artifactId> <packaging>war</packaging> <version>0.0.1-SNAPSHOT</version> <name>SpringKafkaDEMO Maven Webapp</name> <url>http://maven.apache.org</url> <!-- properties constant --> <properties> <spring.version>4.2.5.RELEASE</spring.version> </properties> <dependencies> <!-- junit4 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <type>jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.dbunit</groupId> <artifactId>dbunit</artifactId> <version>2.4.9</version> <scope>test</scope> </dependency> <dependency> <groupId>com.github.springtestdbunit</groupId> <artifactId>spring-test-dbunit</artifactId> <version>1.1.0</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <version>${spring.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjweaver</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-aspects</artifactId> <version>${spring.version}</version> <type>jar</type> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-web</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>${spring.version}</version> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-core</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> <version>2.7.6</version> </dependency> <dependency> <groupId>org.apache.avro</groupId> <artifactId>avro</artifactId> <version>1.7.7</version> </dependency> </dependencies> <build> <finalName>SpringKafkaDEMO</finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <dependencies> <dependency> <groupId>org.codehaus.plexus</groupId> <artifactId>plexus-compiler-javac</artifactId> <version>2.5</version> </dependency> </dependencies> <configuration> <source>1.7</source> <target>1.7</target> <encoding>UTF-8</encoding> <compilerArguments> <verbose /> <bootclasspath>${java.home}/lib/rt.jar:${java.home}/lib/jce.jar</bootclasspath> </compilerArguments> </configuration> </plugin> </plugins> </build> </project>
遇到的问题:
1. spring 中 日志 中的 logback 必须 保持一致 ,这里我使用 org.slf4j 1.6.4
<groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.6.4</version> <type>jar</type> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.6.4</version> <type>jar</type> </dependency>
2. hosts 问题,之前一直 有连接 连不同, zookeeper 使用 172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181 。是可以连通,但这不是我需要的 ,我需要的是172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka ,,
然后 配置成172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka 就报错,怎么也 不成功,之后 配置172.23.27.120:2181,172.23.27.115:2181,172.23.27.116:2181/kafka的时候发现是 域名了,然后联不通,在本地添加 hosts 就可以解决该问题了。
3.使用 jar 包
<dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency>
捐助开发者
在兴趣的驱动下,写一个免费
的东西,有欣喜,也还有汗水,希望你喜欢我的作品,同时也能支持一下。 当然,有钱捧个钱场(右上角的爱心标志,支持支付宝和PayPal捐助),没钱捧个人场,谢谢各位。
谢谢您的赞助,我会做的更好!
相关推荐
spring-boot-starter-kafka示例程序\n 支持springcloud1.5.4,kafka0.8.2.x\n 项目地址:https://github.com/zhyea/spring-boot-starter-kafka
本文将详细介绍如何将Kafka与Spring集成,并提供了详细的实例代码。 软件环境 要完成Kafka与Spring集成,需要以下软件环境: * OS:CentOS 6.4 * ZooKeeper:zookeeper-3.4.6 * Kafka:kafka_2.9.1-0.8.2-beta * ...
在Spring Cloud框架中,整合RabbitMQ或Kafka作为消息驱动是常见的微服务间通信方式。这两种技术都是流行的消息中间件,用于实现异步处理、解耦和扩展性。下面将详细阐述它们在Spring Cloud中的应用。 首先,...
《Spring Kafka 深度解析》 Spring Kafka是Spring框架的一部分,它为Java开发者提供了一种集成Apache Kafka的简便方式,使我们能够充分利用Kafka的分布式消息传递能力。本文将深入探讨Spring Kafka的核心概念、主要...
标题中的“Spring Kafka 整合”指的是在Spring框架中集成Apache Kafka这一消息中间件的过程。Kafka是一个分布式的、基于发布/订阅的消息系统,而Spring Kafka则是Spring对Kafka的官方支持,允许开发者轻松地在Spring...
标题中的“spring-kafka-producer-consumer-example”表明这是一个关于Spring Boot应用,它使用了Apache Kafka作为消息中间件,展示了生产者(producer)和消费者(consumer)的实现。描述中的“Simple application ...
**Kafka实例资源** 在IT行业中,Apache Kafka是一款广泛使用的分布式流处理平台,它被设计为高吞吐、低延迟的消息系统。Kafka的核心功能包括发布订阅消息队列、存储和处理大规模实时数据流。本资源包是针对Kafka...
Spring Kafka 支持基于 Spring 注解的监听器(@KafkaListener),以及 ConsumerFactory 和 ContainerFactory 来管理消费者实例。 - **Topics**: 主题是 Kafka 中的消息队列,消息被发布到主题,然后由消费者消费。...
### Kafka环境搭建与Spring整合详解 #### 一、Kafka基本概念 Kafka是一款开源的分布式消息系统,它能够提供高吞吐量的数据管道和存储服务。为了更好地理解和使用Kafka,我们首先需要了解以下几个核心概念: 1. **...
创建一个Spring组件,该组件包含一个`KafkaTemplate`实例,用于发送消息到指定的主题: ```java @Component public class KafkaProducer { private final KafkaTemplate, String> kafkaTemplate; @Autowired...
在本文中,我们将深入探讨如何使用Java和Spring框架与Kafka 2.11-1.0.0版本进行交互。Kafka是一个分布式流处理平台,广泛用于实时数据处理和消息传递。它与Zookeeper协作,提供高可用性和可扩展性。我们将重点介绍在...
springboot集成kafka,使用教程参考: https://blog.csdn.net/qq_26482855/article/details/119003639 https://blog.csdn.net/qq_26482855/article/details/119005025
主要介绍了Spring纯Java配置集成kafka代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
标题"扩展logback将日志输出到Kafka实例源码"涉及的技术点主要集中在如何将Logback与Kafka集成,使得日志可以被有效地发送到Kafka集群。这个过程通常涉及到以下几个步骤: 1. **添加依赖**:首先,你需要在项目的...
在运行这个demo实例时,确保你已经安装了Kafka,并且其服务器正在运行。解压`spring-kafka.zip`,运行其中的Spring Boot应用,然后你将能看到Spring如何通过XML配置与Kafka进行通信,发送和接收消息。 总的来说,...
8. **测试与模拟**:在“kafka-examples”中,可能会包含测试用例,使用`Testcontainers`库来启动一个本地的Kafka实例,方便进行集成测试。 总结起来,“kafka-examples”是一个极好的学习资源,涵盖了Spring集成...
Spring整合Kafka是一个常见的任务,尤其对于构建大数据流处理或者实时消息传递的系统至关重要。Spring框架提供了Spring Kafka模块,使得开发者能够轻松地在Java应用中集成Apache Kafka。在这个"Spring整合Kafka的...
创建一个`KafkaTemplate`实例,用于发送消息到Kafka主题: ```java @Autowired private KafkaTemplate, String> kafkaTemplate; public void sendMessage(String topic, String message) { kafkaTemplate.send...
最后,我们可以通过`@SpringBootTest`注解启动Spring Boot应用,并通过`@Autowired`注入生产者和消费者实例进行测试。 总的来说,Spring Boot与Kafka的集成使得构建实时数据处理系统变得简单且灵活。通过这种方式,...
Spring Boot 整合 Spring-Kafka 实现发送接收消息实例代码 Spring Boot 是一个基于 Java 的开源框架,它提供了一个灵活的方式来构建 Web 应用程序。Spring-Kafka 是一个基于 Apache Kafka 的消息队列组件,提供了...