- 浏览: 359342 次
- 性别:
- 来自: 北京
文章分类
最新评论
-
liulehua_123:
...
使用netty+spring搭建游戏框架 -
heng123:
netty等视频java.5d6b.com教程
使用netty+spring搭建游戏框架 -
di1984HIT:
、
redis 主从同步配置方案 -
di1984HIT:
学习了~~
使用netty+spring搭建游戏框架 -
di1984HIT:
不错,学习了~~
征服flume之三——使用log4j输出日志到flume
producer
配置文件
property
编解码部分如果使用字符串需要设置,默认是字节数组
使用异(async)通信时,消息队列默认发送时间间隔由queue.buffering.max.ms决定(kafka.producer.async.AsyncProducerConfig中),默认时间间隔为5000ms,也就是说异步方式默认每5s发送一次消息
Customer
配置文件
这里需要注意两个参数:
fixed-delay:即从上一个任务完成开始到下一个任务开始的间隔,单位是毫秒。即每次sleep的时间间隔。
fixed-rate: 即从上一个任务开始到下一个任务开始的间隔,单位是毫秒。即每次获取消息的时间间隔。
consumer-timeout:如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常,我的理解是处理完一个消息后等待fixed-delay+consumer-timeout时间间隔,如果还没消息就重连(不知道理解的对不对,不过实验证明将consumer-timeout值修改后会影响接收消息的频率)
结合spring,还可以使用Spring Integration方式进行配置
service实现
public class KafkaServiceImpl implements KafkaService { private Producer<byte[], byte[]> inner; private Properties properties; public void setInner(Producer<byte[], byte[]> inner) { this.inner = inner; } public void setProperties(Properties properties) { this.properties = properties; } public void init() throws IOException { ProducerConfig config = new ProducerConfig(properties); inner = new Producer<byte[], byte[]>(config); } @Override public void sendMessage(String topicName, byte[] message) { if (topicName == null || message == null) { return; } KeyedMessage<byte[], byte[]> km = new KeyedMessage<byte[], byte[]>(topicName, "".getBytes(), message); inner.send(km); } }
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:util="http://www.springframework.org/schema/util" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:context="http://www.springframework.org/schema/context" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.0.xsd"> <bean id="producerConfig" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="locations"> <list> <value>config/properties/producer.properties</value> </list> </property> </bean> <bean id="kafkaService" class="com.aiyou.gamecloud.kafka.KafkaServiceImpl" init-method="init"> <property name="properties" ref="producerConfig"/> </bean> </beans>
property
metadata.broker.list=192.168.113.181:9092 producer.type=async compression.codec=0 #serializer.class=kafka.serializer.StringEncoder #key.serializer.class=kafka.serializer.StringEncoder
编解码部分如果使用字符串需要设置,默认是字节数组
使用异(async)通信时,消息队列默认发送时间间隔由queue.buffering.max.ms决定(kafka.producer.async.AsyncProducerConfig中),默认时间间隔为5000ms,也就是说异步方式默认每5s发送一次消息
Customer
import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.channel.QueueChannel; import org.springframework.messaging.Message; import com.google.protobuf.InvalidProtocolBufferException; import kafka.consumer.Consumer; /** * @project: gate * @Title: KafkaCustomerService.java * @author: chenpeng * @email: 46731706@qq.com * @date: 2016年1月14日下午1:40:47 * @description: * @version: */ public class KafkaCustomerService { private static final Logger logger = LoggerFactory.getLogger(KafkaCustomerService.class); private static final String CONFIG = "kafka-customer-config.xml"; private static Random rand = new Random(); public static void main(String[] args) { final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class); ctx.start(); final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class); Message msg; while ((msg = channel.receive()) != null) { HashMap map = (HashMap) msg.getPayload(); System.out.println("Here in disb ================" + map.size()); Set<Map.Entry> set = map.entrySet(); for (Map.Entry entry : set) { String topic = (String) entry.getKey(); System.out.println("Topic:" + topic); ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue(); Collection<List<byte[]>> values = messages.values(); for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) { List<byte[]> list = iterator.next(); System.out.println("================" + list.size()); for (byte[] bytes : list) { try { BroadcastMessage message = BroadcastMessage.parseFrom(bytes); logger.debug(message.getGameId()); } catch (InvalidProtocolBufferException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } try { Thread.sleep(100000); } catch (InterruptedException e) { e.printStackTrace(); } ctx.close(); } }
配置文件
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <int:channel id="inputFromKafka" ><int:queue/></int:channel> <int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext" auto-startup="false" channel="inputFromKafka"> <int:poller fixed-delay="10" time-unit="MILLISECONDS" max-messages-per-poll="50" /> </int-kafka:inbound-channel-adapter> <int-kafka:consumer-context id="consumerContext" consumer-timeout="10" zookeeper-connect="zookeeperConnect" > <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" max-messages="5000"> <int-kafka:topic id="websocket_01" streams="1" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="192.168.113.181:2121" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="200" /> </beans:beans>
这里需要注意两个参数:
fixed-delay:即从上一个任务完成开始到下一个任务开始的间隔,单位是毫秒。即每次sleep的时间间隔。
fixed-rate: 即从上一个任务开始到下一个任务开始的间隔,单位是毫秒。即每次获取消息的时间间隔。
consumer-timeout:如果在指定的时间间隔后,没有发现可用的消息可消费,则抛出一个timeout异常,我的理解是处理完一个消息后等待fixed-delay+consumer-timeout时间间隔,如果还没消息就重连(不知道理解的对不对,不过实验证明将consumer-timeout值修改后会影响接收消息的频率)
结合spring,还可以使用Spring Integration方式进行配置
<?xml version="1.0" encoding="UTF-8"?> <beans:beans xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:beans="http://www.springframework.org/schema/beans" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"> <int:channel id="inputFromKafka"></int:channel> <int:service-activator auto-startup="true" input-channel="inputFromKafka" ref="disbService" method="distribute"> </int:service-activator> <int:poller default="true" id="default" fixed-rate="5" time-unit="MILLISECONDS"> </int:poller> <int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext" channel="inputFromKafka"> </int-kafka:inbound-channel-adapter> <int-kafka:consumer-context id="consumerContext" consumer-timeout="5" zookeeper-connect="zookeeperConnect"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="default" max-messages="5000"> <int-kafka:topic id="${gateId}" streams="1" /> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="192.168.113.181:2121" zk-connection-timeout="6000" zk-session-timeout="6000" zk-sync-time="2000" /> </beans:beans>
service实现
import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; import com.google.protobuf.InvalidProtocolBufferException; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame; @Service("disbService") public class DisbService { private static final Logger logger = LoggerFactory.getLogger(DisbService.class); @Value("#{configProperties['server.requestType']}") private String requestType = ERequestType.SOCKET.getValue(); @Autowired private CacheService cacheService; @SuppressWarnings({ "rawtypes", "unchecked" }) public void distribute(HashMap map) { System.out.println("Here in disb ================" + map.size()); Set<Map.Entry> set = map.entrySet(); for (Map.Entry entry : set) { String topic = (String) entry.getKey(); logger.debug("Topic:" + topic); ConcurrentHashMap<Integer, List<byte[]>> messages = (ConcurrentHashMap<Integer, List<byte[]>>) entry .getValue(); Collection<List<byte[]>> values = messages.values(); for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) { List<byte[]> list = iterator.next(); System.out.println("================" + list.size()); for (byte[] bytes : list) { // 这里获取到的是广播的信息 try { if (ERequestType.HTTP.getValue().equals(requestType)) { // 缓存起来!!! } else { Message.BroadcastMessage message = Message.BroadcastMessage.parseFrom(bytes); if (message.getUserIdsList().isEmpty()) { List<ChannelCache> channelList = cacheService.getGameChannelList(topic, message.getGameId(), message.getServerId()); for (ChannelCache channelCache : channelList) { sendMessage(channelCache, message); } } for (String userId : message.getUserIdsList()) { ChannelCache channelCache = cacheService.getCachedChannel(message.getGameId(), message.getServerId(), userId); sendMessage(channelCache, message); } } } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } } } private void sendMessage(ChannelCache channelCache, Message.BroadcastMessage message) { if (channelCache != null) { switch (ERequestType.parse(requestType)) { case HTTP: break; case SOCKET: try { if (channelCache.getChannel().isActive()) { ByteBuf messageData = Unpooled.buffer(); messageData.writeInt(message.getMessage().toByteArray().length); messageData.writeBytes(message.getMessage().toByteArray()); channelCache.getChannel().writeAndFlush(messageData).sync(); } else { cacheService.userLogout(channelCache.getChannel().hashCode()); } } catch (InterruptedException e) { e.printStackTrace(); } break; case WEBSOCKET: try { if (channelCache.getChannel().isActive()) { ByteBuf messageData = Unpooled.buffer(); messageData.writeInt(message.getMessage().toByteArray().length); messageData.writeBytes(message.getMessage().toByteArray()); channelCache.getChannel().writeAndFlush(new BinaryWebSocketFrame(messageData)).sync(); } else { cacheService.userLogout(channelCache.getChannel().hashCode()); } } catch (InterruptedException e) { e.printStackTrace(); } default: break; } } } }
发表评论
-
Java编程规范
2018-05-21 12:59 1012Java编程规范 一、 命名约束 1. [强制] 代码中的命名 ... -
java面试整理二——JVM
2018-02-25 16:07 8621. JVM的主要结构 JVM ... -
java 面试整理一——基础知识
2018-02-25 16:04 6711. short s1 = 1; s1 = s1 + 1; ... -
基于jquery mobile+websocket+protocol buffer的IM开发
2016-01-15 10:46 3832第一次研究有关web端的框架,结合工作需要,决定搞一个基于we ... -
linux操作系统下 Jenkins+SVN+Maven+Tomcat 自动集成环境搭建
2016-01-05 15:59 5849一、准备工作 jdk(jdk1.7.0_65.zip) Mav ... -
tcp 连接关闭详解
2014-07-10 14:48 4122主动发起关闭TCP链接端 ... -
Linux下Java线程状态分析
2014-06-11 14:39 5261在Linux上输入top 进入top后按【shift】+【H】 ... -
java面试整理(1)
2012-12-04 15:44 11701. short s1 = 1; s1 = s1 + 1; ... -
Java并发之——同步与原子性
2012-08-27 13:08 1435每一个线程自顾自的做 ... -
基于netty的websocket开发小结
2012-07-27 18:03 60225WebSocket是html5规范新引入的功能,用于解决浏览器 ... -
spring集成log4j配置信息
2012-07-17 14:32 2337spring中集成log4j并非难事,只需要在web.xml ... -
J2EE开发中的java字符编码问题经验总结
2012-07-16 12:28 1573在J2EE程序开发过程中,经常遇到字符的编码问题。这一问题困扰 ... -
使用 Netty 进行 UDP 网络编程
2012-07-16 11:33 32894使用 Netty 进行 UDP 网络编程 在正式开始 ... -
将json字符串转换为对象
2012-07-13 09:52 1460准备将这几年的工作进行一个系统的总结,会陆续发些东西出来供大家 ... -
使用netty+spring搭建游戏框架
2012-07-12 18:52 51572一、 java NIO简介 nio是java New IO的简 ...
相关推荐
Kafka与Spring集成指南 Kafka分布式消息系统是一种高吞吐量、可扩展、基于发布订阅模式的消息系统,广泛应用于大数据处理、实时数据处理和日志处理等领域。Spring Framework是一种流行的Java应用程序框架,提供了一...
在本文中,我们将深入探讨如何将Spring框架与Apache Kafka集成,以便实现...压缩包中的"Kafka与spring集成的代码"文件应该包含了这些示例的完整实现,供你参考和学习。记得根据实际的Kafka集群配置和业务需求调整代码。
在Spring Cloud中集成Kafka,你需要: 1. 添加依赖:在`pom.xml`文件中引入`spring-kafka`依赖。 2. 配置Kafka:配置Kafka服务器地址、端口等信息。 3. 定义消息:创建消息的Java类,可以使用`@KafkaListener`注解...
在这个项目中,Spring Boot通过其内置的自动配置能力与Kafka进行集成。以下是一些关键点: 1. **依赖管理**:在`pom.xml`中,我们需要添加Spring Boot对Kafka的依赖。这通常包括`spring-boot-starter-data-jpa`和`...
在本文中,我们将深入探讨如何将Spring Boot框架与Apache Kafka进行集成,以便在微服务架构中实现高效的数据流处理。Apache Kafka是一个分布式流处理平台,它允许开发者构建实时数据管道和流应用程序。Spring Boot...
spring-kafka是Spring框架对Apache Kafka消息系统进行整合的一个项目,它简化了Kafka的使用,使其更容易与Spring应用程序集成。Apache Kafka是一个分布式流处理平台,主要用于构建实时数据管道和流应用程序。它具备...
它的客户端命令以及核心配置文件是其操作的核心,而spring集成和webflux集成则可以将Kafka3.0与主流的Java开发框架相结合,集群搭建则是其分布式特性的基础。 消息系统是一种应用程序,用于在不同的软件组件之间...
在Spring Boot应用中,我们可以利用Spring Kafka框架来与Apache Kafka进行集成,实现高效的消息传递。本文将详细探讨如何在Spring Boot项目中基于Spring Kafka动态创建Kafka消费者。 首先,了解Kafka基本概念:...
### SpringCloud与Kafka消息中间件集成教程 #### 一、SpringCloud概述 Spring Cloud 是一个基于Spring Boot的框架,旨在提供一系列开箱即用的工具和服务,帮助开发者轻松构建和部署微服务架构的应用程序。它集合了...
Spring Kafka 是 Spring 框架为集成 Apache Kafka 提供的一个模块,它使得在 Java 应用程序中使用 Kafka 变得简单且直观。本项目 "spring-kafka-demo" 提供了一个完整的示例,涵盖了生产者和消费者的配置与实现,...
3. **Spring Cloud Stream 与 Kafka 集成**: - 使用 Spring Cloud Stream 集成 Kafka 需要在配置中指定消息代理类型(Binder),例如 `spring.cloud.stream.kafka.bindings`。 - 应用程序可以通过 ...
Spring Kafka是Spring框架的一部分,专为集成Apache Kafka而设计,提供了一套轻量级且强大的API,使得在Java应用中使用Kafka变得更加简单。本文将围绕Spring Kafka的源代码进行深度解析,帮助开发者更好地理解和运用...
在`springboot-kafka`目录下,可能包含了一个示例项目,你可以通过导入IDE进行编译和运行,以了解Spring Boot与Kafka集成的实践操作。 通过以上步骤,你已经掌握了如何在Spring Boot 2.7.3中集成Apache Kafka的基本...
在 "kafka-0.10-demo" 文件中,我们可以预期找到一个使用 Spring Kafka 与 Kafka 0.10 版本集成的示例项目。这个示例可能包含生产者和消费者代码,以及相关的配置文件,帮助我们理解如何在实践中使用 Spring Kafka。...
Spring Kafka是Spring框架对Apache Kafka的集成,它允许开发者在Spring应用中轻松地使用Kafka作为消息传递系统。Apache Kafka是一种分布式流处理平台,常用于构建实时数据管道和流应用程序。下面将详细介绍Spring ...
Spring Kafka是Spring框架的一部分,它为Java开发者提供了一种集成Apache Kafka的简便方式,使我们能够充分利用Kafka的分布式消息传递能力。本文将深入探讨Spring Kafka的核心概念、主要功能以及实际应用,旨在帮助...
在本文中,我们将深入探讨如何将Kafka集成到SpringBoot项目中,并且会涉及SpringBoot版本与Swagger版本匹配的重要性,以避免出现访问Swagger文档时的空指针异常。Kafka作为一个分布式流处理平台,常被用于构建实时...
在本项目中,我们将深入探讨如何使用Spring Boot与Kafka进行集成,实现一个实战项目,包括Kafka的生产者、消费者以及如何创建Topic,并且特别关注指定消费分区这一高级特性。Kafka是一款高吞吐量的分布式消息系统,...
- **源码分析**:深入理解Spring Kafka的内部实现,例如消费者和生产者的生命周期管理,如何读写消息,以及如何与其他Spring组件(如TransactionManager)集成。 - **工具使用**:可能会介绍一些辅助工具,如Kafka的...