`
zhangwei_david
  • 浏览: 475867 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

实现RPC就是这么简单 之 Netty 实现(二)心跳检测和粘包处理

    博客分类:
  • Java
阅读更多

    TCP 粘包和拆包

        TCP 是个“流”协议,所谓流,就是没有界限的一串数据。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上,一个完整的数据包可能会被TCP拆分为多个数据包进行发送,也有可能把多个小的数据包封装成一个大的数据包发送,这就是所谓的TCP粘包和拆包问题。

    粘包问题的解决策略

         由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组,这个问题只能够通过上层的应用协议栈设计来解决;大致可以分为:

  • 定长消息
  • 在包尾增加分隔符进行分割
  • 将消息分为消息头和消息体,消息头中包含消息体长度
  • 更复杂的应用层协议

       在这里使用消息头和消息体的方式解决粘包问题。Netty提供了LengthFieldPrepender和LengthFieldBasedFrameDecoder进行消息头部大编码和解码

     本次实在 实现RPC就是这么简单 之 Netty 实现 基础上优化,仅仅贴出了修改代码。

/**
 *
 * @author zhangwei
 * @version $Id: NettyServiceServer.java, v 0.1 2015年8月19日 下午2:08:37 $
 */
public class NettyServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware {

	/** 服务端口号 **/
	private int port = 12000;

	private RpcServerHandler rpcServerHandler;

	private void publishedService() throws Exception {

		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup(5);
		try {
			ServerBootstrap serverBootstrap = new ServerBootstrap();
			serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
					.handler(new LoggingHandler(LogLevel.DEBUG)).childHandler(new ChannelInitializer<SocketChannel>() {
						@Override
						public void initChannel(SocketChannel channel) throws Exception {
							channel.pipeline().addLast(new LengthFieldBasedFrameDecoder(2048, 0, 2, 0, 2))
									.addLast(new LengthFieldPrepender(2))
									.addLast(new RpcDecoder(SrpcRequest.class))
									.addLast(new RpcEncoder(SrpcResponse.class)).addLast(rpcServerHandler)
									.addLast("idleStateHandler", new IdleStateHandler(10, 5, 0))
									.addLast(new ChannelInboundHandlerAdapter() {

								/**
								 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
								 *      java.lang.Object)
								 */
								public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

									if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
										IdleStateEvent event = (IdleStateEvent) evt;
										if (event.state() == IdleState.READER_IDLE) {
											System.out.println("read 空闲");
											ctx.disconnect();
										} else if (event.state() == IdleState.WRITER_IDLE) {
											System.out.println("write 空闲");
										} else if (event.state() == IdleState.ALL_IDLE) {
											System.out.println("读写都空闲");
										}
									}

								}

							});
						}
					});

			// 绑定主机+端口
			ChannelFuture future = serverBootstrap.bind("127.0.0.1", port).sync();

			// 等待服务监听端口关闭
			future.channel().closeFuture().sync();
		} finally {
			bossGroup.shutdownGracefully();
			workerGroup.shutdownGracefully();
		}
	}

	/**
	 * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()
	 */
	@Override
	public void afterPropertiesSet() throws Exception {
		publishedService();
	}

	/**
	 * @see org.springframework.context.Lifecycle#start()
	 */
	@Override
	public void start() {
	}

	/**
	 * @see org.springframework.context.Lifecycle#stop()
	 */
	@Override
	public void stop() {

	}

	/**
	 * @see org.springframework.context.Lifecycle#isRunning()
	 */
	@Override
	public boolean isRunning() {
		return false;
	}

	/**
	 * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
	 */
	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class);
		Map<String, Object> handlerMap = new HashMap<String, Object>();
		if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) {
			for (Object serviceBean : serviceBeanMap.values()) {
				String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf().getName();
				handlerMap.put(interfaceName, serviceBean);
			}
		}
		System.out.println("自动注册的服务SRPC 服务有:" + handlerMap.keySet());
		rpcServerHandler = new RpcServerHandler(handlerMap);
	}
}

 

/**
 *
 * @author zhangwei_PF
 * @version $Id: SrpcRequestSender.java, v 0.1 2015年8月20日 下午2:13:31 $
 */
@Sharable
public class SrpcRequestSender extends SimpleChannelInboundHandler<SrpcResponse> {

	// final CountDownLatch latch = new CountDownLatch(1);

	private BlockingQueue<SrpcResponse> responseHodler = new LinkedBlockingQueue<SrpcResponse>(1);

	// private SrpcResponse response;

	@Override
	public void channelRead0(ChannelHandlerContext ctx, SrpcResponse response) throws Exception {
		// this.response = response;
		// latch.countDown();
		responseHodler.put(response);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		ctx.close();
	}

	public SrpcResponse send(SrpcRequest request, String host, int port) throws Exception {

		EventLoopGroup group = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.DEBUG))
					.handler(new ChannelInitializer<SocketChannel>() {

						@Override
						protected void initChannel(SocketChannel ch) throws Exception {
							ch.pipeline().addLast(new LengthFieldPrepender(2))
									.addLast(new LengthFieldBasedFrameDecoder(2048, 0, 2, 0, 2))
									.addLast(new RpcEncoder(SrpcRequest.class))
									.addLast(new RpcDecoder(SrpcResponse.class))
									.addLast(SrpcRequestSender.this)
									.addLast("idleStateHandler", new IdleStateHandler(10, 10, 0))
									.addLast(new ChannelInboundHandlerAdapter() {

								/**
								 * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext,
								 *      java.lang.Object)
								 */
								public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

									if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {
										IdleStateEvent event = (IdleStateEvent) evt;
										if (event.state() == IdleState.READER_IDLE) {
											System.out.println("read idle");

										} else if (event.state() == IdleState.WRITER_IDLE) {
											System.out.println("write idle");
											ctx.channel().writeAndFlush("ping");
										} else if (event.state() == IdleState.ALL_IDLE) {
											System.out.println("all idle");
											ctx.channel().writeAndFlush("ping");
										}
									}

								}

							});
						}

					});
			ChannelFuture future = bootstrap.connect(host, port).sync();
			Channel channel = future.channel();

			channel.writeAndFlush(request).sync();
			/**
			 *
			 * 使用闭锁实现等待
			 */
			// latch.await();
			SrpcResponse response = responseHodler.take();
			channel.closeFuture();
			return response;
		} finally {
			group.shutdownGracefully();
		}

	}

}

 

 

 

1
1
分享到:
评论

相关推荐

    springboot整合 netty做心跳检测

    springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合 netty做心跳检测 springboot整合...

    netty+websocket实现心跳和断线重连

    在本文中,我们将深入探讨如何利用 Netty 和 WebSocket 实现心跳检测和断线重连机制。 首先,我们需要理解 WebSocket 协议。WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议,它为客户端和服务器提供了低...

    基于netty_4.0.26的心跳检测

    在“基于netty_4.0.26的心跳检测”这个主题中,我们将深入探讨如何在Netty 4.0.26版本中实现心跳检测机制,以确保网络连接的稳定性和可靠性。 心跳检测是网络通信中的一个重要概念,主要用于检测客户端与服务器之间...

    使用Netty解决TCP粘包和拆包问题过程详解

    使用Netty解决TCP粘包和拆包问题过程详解 Netty是一个流行的Java网络编程框架,提供了简洁、灵活的API来处理网络编程的各种问题。其中,解决TCP粘包和拆包问题是Netty的一个重要应用场景。本文将详细介绍使用Netty...

    netty 数据分包、组包、粘包处理机制(部分)1

    Netty 数据分包、组包、粘包处理机制 Netty 中的 LengthFieldBasedFrameDecoder 是一种常用的处理大数据分包传输问题的解决类。该类提供了多种参数来调整帧的解码方式,从而满足不同的应用场景。 1. maxFrame...

    spring整合netty心跳检测

    通过上述步骤,我们就能实现Spring Boot与Netty的整合,并加入心跳检测功能。这有助于我们在分布式系统中构建出更健壮、更可靠的通信框架,提高系统的稳定性,减少因网络问题导致的服务中断。在实际项目中,这种整合...

    Netty4.1实战-手写RPC框架.pdf

    本专题主要通过三个章节实现一个rpc通信的基础功能,来学习RPC服务中间件是如何开发和使用。章节内以源码加说明实战方式来讲解,请尽可能下载源码学习。 - 手写RPC框架第一章《自定义配置xml》 - 手写RPC框架第二章...

    Netty粘包分包服务器端客户端完整例子

    这些问题在基于字节流的通信协议中尤为常见,因为字节流不保留消息边界,需要应用程序自己去识别和处理。 Netty提供了解决粘包和分包的策略,其中`LineBasedFrameDecoder`和`DelimiterBasedFrameDecoder`是两种常用...

    Netty精粹之TCP粘包拆包问题

    ### Netty精粹之TCP粘包拆包问题详解 #### 一、引言 在网络通信领域,尤其是在基于TCP协议的应用程序开发中,经常会遇到“粘包”和“拆包”的问题。这些问题虽然属于较为底层的技术细节,但对于保障数据传输的准确...

    断网断电心跳检测

    Netty心跳检测的实现往往结合ChannelHandlerContext和ByteBuf对象,利用它们来编码和解码心跳包,同时利用Future和ChannelFutureListener来处理超时和异常情况。 在实际应用中,心跳检测还可以与其他机制相结合,如...

    netty拆包粘包解决方案示例

    请注意,实际应用中可能需要更复杂的逻辑,如异常处理、心跳检测等,以确保网络连接的稳定性和可靠性。 在 ZhouzyNetty 压缩包中,可能包含了实现这些功能的示例代码,可以作为学习和参考。通过阅读和理解这些代码...

    netty 断线重连+心跳

    netty使用自带工具类实现断线重连和心跳包

    netty自定义rpc实现

    综上所述,实现一个基于Netty的自定义RPC框架,需要理解Netty的异步I/O模型,设计合理的RPC通信协议,利用Zookeeper进行服务注册与发现,同时考虑服务的高可用性和性能优化。通过分析提供的压缩包文件,我们可以深入...

    一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架。提供服务注册,发现,负载均衡,.zip

    标题中的“一款基于Netty+Zookeeper+Spring实现的轻量级Java RPC框架”揭示了这个项目的核心技术栈,它整合了三个在分布式系统中广泛使用的开源库:Netty、Zookeeper和Spring。让我们逐一深入探讨这三个技术以及它们...

    基于Netty实现了dubbo rpc

    【标题】基于Netty实现Dubbo RPC 在分布式系统中,RPC(Remote Procedure Call)是一种常见且重要的通信机制,它允许一个程序调用另一个在不同进程中运行的程序,就像调用本地函数一样简单。Dubbo作为阿里巴巴开源...

    基于springboot+netty实现的心跳检测-源码

    本项目是基于Spring Boot和Netty框架实现的心跳检测功能,它能够帮助开发者在分布式系统或微服务架构中检测服务是否正常运行,及时发现并处理网络故障。下面将详细解释这个项目涉及的关键知识点。 1. **Spring Boot...

    基于netty实现的支持长连接的rpc

    标题中的“基于netty实现的支持长连接的rpc”是指利用Netty框架构建一个远程过程调用(RPC)系统,该系统能够维持长时间的连接状态,提高通信效率。Netty是一个高性能、异步事件驱动的网络应用程序框架,适用于开发...

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip

    基于springboot+netty实现的心跳检测源码+项目说明文档.zip (1),NioEventLoopGroup是一个处理I / O操作的多线程事件循环。 Netty为不同类型的传输提供各种EventLoopGroup实现。我们在此示例中实现了服务器端应用程序...

    springboot netty4 及时通讯(带心跳检测,zookeeper)

    本文将深入探讨如何使用SpringBoot整合Netty4实现即时通讯,并结合Zookeeper进行服务发现与管理,同时包含心跳检测和下线重连机制。 **一、SpringBoot与Netty集成** SpringBoot以其简化Spring应用初始搭建以及开发...

    Netty 粘包/半包原理与拆包实战 源码

    本实例是《Netty 粘包/半包原理与拆包实战》 一文的源代码工程。 大家好,我是作者尼恩。 在前面的文章中,完成了一个...在开始聊天器实战开发之前,还有一个非常基础的问题,需要解决:这就是通讯的粘包和半包问题。

Global site tag (gtag.js) - Google Analytics