0 0

Netty 4动态添加ChannelHandler问题0

Netty 4 测试
服务端
public void doStart() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		EventLoopGroup workerGroup = new NioEventLoopGroup();
		ServerBootstrap bootstrap = new ServerBootstrap();
		ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				ChannelPipeline pipeline = ch.pipeline();
				pipeline.addLast(ENCODER, new ProtobufEncoder());
				pipeline.addLast(DECODER_SELECTOR, new ProtobufDecoder(
						BusinessSelector.getDefaultInstance()));
				pipeline.addLast(HANDLER_DISPATCHER, new DispatcherHandler());
			}
		};
		bootstrap.group(bossGroup, workerGroup)
				.channel(NioServerSocketChannel.class)
				.childHandler(initializer)
				.option(ChannelOption.SO_BACKLOG, 128)
				.childOption(ChannelOption.SO_KEEPALIVE, true);

		ChannelFuture channelFuture = bootstrap.bind(SERVER_PORT).sync();
		ChannelFuture closeFuture = channelFuture.channel().closeFuture();
		closeFuture.addListener(ChannelFutureListener.CLOSE);
	}


DispatcherHandler是用来根据BusinessType选择具体业务逻辑的Handler,例如BusinessType为REGISTER,则pipeline会动态添加REGISTER相关的解码器和处理器DECODER_REGISTER、HANDLER_REGISTER,并从pipeline中去掉Business选择器的解码器DECODER_SELECTOR和DispatcherHandler,如下

public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
        if (msg instanceof BusinessSelector) {
            ChannelPipeline pipeline = ctx.pipeline();
            logger.debug("pipeline before select business = {}", pipeline);
            BusinessSelector selector = (BusinessSelector) msg;
            BusinessType businessType = selector.getBusinessType();
            logger.debug("business type = {}", businessType);
            switch (businessType) {
                case REGISTER:
                    pipeline.addLast(ServerConstant.DECODER_REGISTER,
                        new RegisterDecoder(User.getDefaultInstance()));
                    pipeline.addLast(ServerConstant.HANDLER_REGISTER,
                        new RegisterHandler());
                    break;
                default:
                    break;
            }
            pipeline.remove(ServerConstant.DECODER_SELECTOR);
            pipeline.remove(this);
            logger.debug("pipeline after select business = {}", pipeline);
        }
    }


这样debug之后看pipeline中的Handler是正确的,即encoder、register-decoder、register-handler。问题是每次服务端重新启动,客户端第一次访问服务,基本上发过来的消息都触发不了register-handler的channelRead方法,而再次访问的时候就可以了,通过多线程测试了一下,20次访问服务,有时候可能只有十几次能触发channelRead。问题在哪里呢?求大神指点
客户端代码
public static void main(String[] args) throws Exception {
		Client client = Client.getInstance();
		client.setServer("localhost", 18080);
		User user = createUser();
		client.register(user);
	}

public void register(User user) throws InterruptedException {
        Channel channel = connect(BusinessType.REGISTER);
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(user).sync();
        }
    }

 private synchronized Channel connect(BusinessType businessType)
        throws InterruptedException {
        return connect(this.serverHost, this.serverPort, businessType);
    }

    private synchronized Channel connect(String serverHost, int serverPort,
        BusinessType businessType) throws InterruptedException {
        if (channelMap.containsKey(businessType)) {
            return channelMap.get(businessType);
        }
        ChannelFuture connectFuture = bootStrap.connect(serverHost, serverPort);
        Channel channel = connectFuture.channel();
        if (connectFuture.await(MAX_WAIT_MIN, TimeUnit.MINUTES)) {
            boolean connected = connectFuture.isSuccess();
            if (connected) {
                BusinessSelector.Builder selBuilder = BusinessSelector
                    .newBuilder();
                selBuilder.setBusinessType(businessType);
                channel.writeAndFlush(selBuilder.build()).sync();
                channelMap.putIfAbsent(businessType, channel);
            }
            channel.closeFuture().addListener(ChannelFutureListener.CLOSE);
            return channel;
        } else {
            return null;
        }
    }
2014年10月30日 19:01

1个答案 按时间排序 按投票排序

0 0

你在最后做一个fireChannelRead试试

2015年8月05日 01:30

相关推荐

    netty+4G DTU

    2. **实现 ChannelInitializer**:在 ChannelInitializer 中,添加自定义的 ChannelHandler 实例,如 ByteToMessageDecoder 解码器用于解析 4G DTU 发送的二进制数据,以及 MessageToByteEncoder 编码器用于将服务器...

    使用Netty4实现多线程的消息分发

    你可以动态地添加、删除或替换处理器,以适应不同的业务需求。 四、游戏服务器应用 在游戏服务器场景下,消息分发至关重要。例如,game-dispatcher 文件可能包含了具体的处理器实现,如玩家登录验证、游戏状态更新...

    netty4服务端客户端实例

    4. **ChannelPipeline**: 事件处理链,可以添加多个ChannelHandler,并按照顺序处理事件。 5. **EventLoopGroup**: 线程池,负责处理I/O事件。 在IntelliJ IDEA中,首先需要确保已经安装了Java SDK,并配置好项目...

    Netty4+ProtoBuf通信框架

    Netty4+ProtoBuf通信框架是一种高效的网络应用框架,它结合了Netty的高性能和Google的Protocol Buffers(ProtoBuf)的数据序列化能力,用于构建可伸缩、高并发的网络应用程序。在这个项目中,客户端和服务端之间的...

    Netty4事件处理传播机制,java高级开发工程师要求(csdn)————程序.pdf

    开发者可以通过添加、移除和替换 `ChannelHandler` 来动态调整处理逻辑,以满足不同场景的需求。理解和掌握这一机制对于成为一名 Java 高级开发工程师至关重要,因为它有助于构建高效且可扩展的网络应用。

    Netty4编写服务器客户端,自定义编解码,发送自定义消息

    在这个项目中,我们将深入理解如何利用 Netty 4 来编写服务器和客户端,实现自定义的消息编解码,并进行通信。 首先,我们要创建一个自定义的消息类。这个消息类通常会包含必要的字段,比如消息头、消息体等,以...

    Netty4详解二:开发第一个Netty应用程序

    在本篇文章中,我们将深入探讨如何使用Netty4开发第一个Netty应用程序,以此来理解其核心概念和工作原理。 首先,我们要明白Netty的基本架构。Netty基于NIO(非阻塞I/O)设计,通过EventLoop(事件循环)和Channel...

    netty实现sdtp协议

    在 `childHandler()` 方法中,添加我们之前创建的 `ChannelHandler` 链路,这样 Netty 就知道如何处理 SDTP 数据流。 4. **启动服务器**:调用 `ServerBootstrap.bind()` 方法启动服务器,等待硬件设备的连接。当...

    Netty4EchoDemo

    Netty4EchoDemo是一个基于Netty 4框架实现的简单回显客户端(echo client)和服务器(echo server)示例。这个项目适用于那些想要学习如何使用Netty进行网络编程的开发者,尤其是对Java NIO(非阻塞I/O)感兴趣的...

    Netty 教程 Netty权威指南

    4. **Pipeline**:每个 Channel 都有一个 ChannelPipeline,它是一个事件处理链,可以根据需求动态添加或移除 ChannelHandler,实现了事件流的灵活定制。 5. **EventLoop 和 EventLoopGroup**:EventLoop 负责执行 ...

    netty5实现的websocket服务器

    4. **配置WebSocket编码和解码器**:为了将WebSocket帧转换成ByteBuf(Netty的数据传输对象),我们需要添加WebSocketFrameDecoder和WebSocketFrameEncoder到处理链中。 5. **绑定监听端口并启动服务**:通过...

    Netty5.rar

    4. **ChannelHandler** 和 **ChannelPipeline**: ChannelHandler 处理 I/O 事件或拦截 Channel 上的数据。ChannelPipeline 是 ChannelHandler 的链表,数据会在其中流动并被各个 Handler 处理。 **创建 Netty ...

    整合netty实时通讯

    - Netty 提供了一套丰富的 ChannelHandler,用于处理进/出站数据,如解码、编码、日志记录等。这些处理器形成一个处理管道,数据流经管道中的每个处理器。 - Netty 的 ByteBuf 提供了高效的内存管理,避免了频繁的...

    netty学习之ServerChannel

    你可以通过`ChannelPipeline`添加自定义的ChannelHandler,以处理连接建立、数据读写、异常处理等事件。 4. **ChannelPipeline**:ChannelPipeline是处理I/O事件的链条,它包含了一系列ChannelHandler。当数据从...

    netty5.zip

    9. **可扩展性**:Netty 的设计允许用户自定义协议处理,通过编写自定义的ChannelHandler,可以轻松添加新的功能或支持新的协议。 在Netty 5.0.0.Alpha1版本中,可能会包含以下内容: - 源代码:展示了Netty的内部...

    netty4概述.pdf

    4. 支持监听器和回调:Netty允许用户在某些事件发生时添加回调函数,例如连接事件、读写事件等。 5. 零拷贝:Netty在某些情况下支持零拷贝操作,减少了不必要的内存复制。 6. 缓冲池:为了减少内存消耗和垃圾回收的...

    Netty-API-文档中文版

    4. **高效的内存管理**:Netty 使用ByteBuf作为字节缓冲区,它提供了一种更高效的方式管理网络通信中的内存,避免了Java原生的ByteBuffer带来的性能损耗。 5. **强大的编码解码器**:Netty 提供了各种编码解码器,...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    再者,要了解ChannelPipeline的构建,如何添加和移除ChannelHandler,以及它们的执行顺序。最后,熟悉非阻塞I/O和线程模型,这将有助于理解RocketMQ如何处理大量并发连接。 RocketMQ的Broker节点使用Netty作为网络...

    resteasy使用netty

    此外,Netty提供了高度定制化的ChannelHandler,允许开发者灵活地处理网络通信的各种细节。 将RestEasy与Netty结合,可以创建一个轻量级、高性能的REST服务。首先,我们需要在项目中引入RestEasy和Netty的相关依赖...

    netty-all-5.0.0.Alpha3 完整pom.xml配置

    3. **高度可定制的事件处理链**:Netty的ChannelHandler接口允许开发者构建自定义的事件处理链,通过Pipeline机制,每个请求会按照预设顺序在链上进行处理。 4. **丰富的协议支持**:Netty内置了对多种常见网络协议...

Global site tag (gtag.js) - Google Analytics