`
vanadiumlin
  • 浏览: 504641 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

netty的个人使用心得

 
阅读更多
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。



如果需要客户端和服务器端沟通 分别都需要编写一个 实现了SimpleChannelHandler接口的类,其中类中需要重写的主要方法为

channelConnected() and channelOpen()  这两个方法为  当客户端链接到服务器端得时候和 客户端 channel被创建出来的时候所调用的



channelDisconnected and  channelClosed() 对应上面的两个方法



exceptionCaught 可以获得 对应handler端(服务器或客户端)的异常信息



messageReceived 每个 客户端 发送的信息后  将调用此方法



当编写完某端得程序后(客户端或服务器端) 将编写好的handler需要配置在 实现了ChannelPipelineFactory的类里,ChannelPipelineFactory中有一个需要实现的方法getPipeline将写好的handler配置到其中,在这个 工厂里 可能要添加很多东西

比如说 编解码器,心跳等。。。。



如需要自定义编解码器需要继承:LengthFieldBasedFrameDecoder(解码),OneToOneEncoder(编码)



编解码器(encode,decode)

encode为  调用messageReceived 方法之后调用的方法,则decode方法为 messageReceived 之前调用的方法 ,用于处理自定义包协议的解析于编辑



心跳: 当客户端socket在非正常情况家掉线,如: 断网,断电等特殊问题的时候, 客户端的channel对象不会自动关闭,需要一直接收到客户端的消息,从而判断是否可以和对象构成通信。。 如果 发现客户端空闲时间过长则视为掉线







服务端handler代码如下



package com.djyou.server;

import java.util.logging.Logger;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ChildChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;


public class ChatServerHandler extends SimpleChannelHandler{

public static final ChannelGroup channelGroup = new DefaultChannelGroup();

public int id;

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
   throws Exception {
  System.out.println("进来一个");
}

@Override
public void channelDisconnected(ChannelHandlerContext ctx,
   ChannelStateEvent e) throws Exception {
   super.channelDisconnected(ctx, e);
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
   throws Exception {
  Logger.getAnonymousLogger().info(e.getCause().getMessage());
  ctx.getChannel().close();
  // TODO Auto-generated method stub
  //super.exceptionCaught(ctx, e);
}

@Override
public void childChannelClosed(ChannelHandlerContext ctx,
   ChildChannelStateEvent e) throws Exception {
 
  super.childChannelClosed(ctx, e);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
   throws Exception {
  System.out.println(this.id++);
 
  //google protocol解码后返回为 ChannelBuffer类型
  if(!(e.getMessage() instanceof ChannelBuffer)) return;
  //获得 消息对象
  ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage();
 
  //MessageInfo info = Message.MessageInfo.newBuilder().mergeFrom(channelBuffer.copy().array()).build();
  //写回给客户端
  e.getChannel().write(channelBuffer);
 
 
}


}



pieplelineFactory里的代码为



package com.djyou.server;
import static org.jboss.netty.channel.Channels.*;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.jboss.netty.handler.timeout.IdleStateHandler;
import org.jboss.netty.util.Timer;

public class ChatPipelineServerFactory implements ChannelPipelineFactory{

private Timer timer;

public ChatPipelineServerFactory(Timer timer){
 
  this.timer = timer;
}

@Override
public ChannelPipeline getPipeline() throws Exception {
 
  ChannelPipeline pipeline = pipeline();
 
  //添加netty默认支持的 编解码器(可自动添加包头,并处理粘包问题)

pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());//对应
pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());//此对象为 netty默认支持protocolbuf的编解码器
    pipeline.addLast("timeout", new IdleStateHandler(timer, 10, 10, 0));//此两项为添加心跳机制 10秒查看一次在线的客户端channel是否空闲,IdleStateHandler为netty jar包中提供的类
  pipeline.addLast("hearbeat", new Heartbeat());//此类 实现了IdleStateAwareChannelHandler接口

  //netty会定时扫描 空闲的channel
  //pipeline.addLast("frameDecoder", new ProtobufDecoder(Message.MessageInfo.getDefaultInstance()));
  //pipeline.addLast("frameEncoder", new ProtobufEncoder());//
  pipeline.addLast("handler", new ChatServerHandler());//将编写好的服务器端的handler添加到这里
 
 
  return pipeline;
}

}



心跳包的代码如下



import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.timeout.IdleState;
import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
import org.jboss.netty.handler.timeout.IdleStateEvent;
public class Heartbeat extends IdleStateAwareChannelHandler{

int i = 0;

@Override
public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
   throws Exception {
  // TODO Auto-generated method stub
  super.channelIdle(ctx, e);
 
  if(e.getState() == IdleState.WRITER_IDLE)
   i++;
 
  if(i==3){
   e.getChannel().close();
  
   System.out.println("掉了。");
  }
}


}



自定义解码器代码



package com.djyou.server;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;

public class Decode extends LengthFieldBasedFrameDecoder{

public Decode(int maxFrameLength, int lengthFieldOffset,
   int lengthFieldLength) {
  super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
  // TODO Auto-generated constructor stub
}

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
   ChannelBuffer buffer) throws Exception {
 
  ChannelBuffer buffs = (ChannelBuffer)super.decode(ctx, channel, buffer);
 
 
  return buffs;
}

}



自定义编码器代码



package com.djyou.server;

import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class Encode extends OneToOneEncoder{

@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
   Object msg) throws Exception {
  // TODO Auto-generated method stub
  return null;
}

}
服务端启动代码





package com.djyou.server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timer;

public class ChatServer {
public static void main(String[] args) {
  ChannelFactory factory = new NioServerSocketChannelFactory(Executors
    .newCachedThreadPool(), Executors.newCachedThreadPool(),
    Runtime.getRuntime().availableProcessors() + 1);

  Timer timer = new HashedWheelTimer();
 
  ServerBootstrap bootstrap = new ServerBootstrap(factory);
  bootstrap.setPipelineFactory(new ChatPipelineServerFactory(timer));
  bootstrap.setOption("child.tcpNoDelay", true);
  bootstrap.setOption("child.keepAlive", true);
  bootstrap.setOption("reuseAddress", true);

  bootstrap.bind(new InetSocketAddress(6666));
}
}



客户端启动代码如下(除客户端启动代码意外 其余的东西都与服务器端一样 都需要编写对应的 编解码器,定时发送消息线程(10秒发个信息给服务端 确保channel不为空闲, 来对应心跳程序), 客户端的handler)



package com.djyou.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

import com.djyou.protoBufModel.Message;
import com.djyou.protoBufModel.Message.MessageInfo;

public class ChatClient {

public static void main(String[] args) throws Exception{
   String host = "localhost";
         int port = 6666;

         // Configure the client.
         ChannelFactory factory =
             new NioClientSocketChannelFactory(
                     Executors.newCachedThreadPool(),
                     Executors.newCachedThreadPool());

         ClientBootstrap bootstrap = new ClientBootstrap(factory);
         ChatPipelineClientFactory  cpcf = new ChatPipelineClientFactory();
         bootstrap.setPipelineFactory(cpcf);

         // Start the connection attempt.
         ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));

         // Wait until the connection attempt succeeds or fails.
         Channel channel = future.awaitUninterruptibly().getChannel();
         if (!future.isSuccess()) {
             future.getCause().printStackTrace();
             System.exit(0);
         }

         // Read commands from the stdin.
         ChannelFuture lastWriteFuture = null;
         BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
//         for (;;) {
//             String line = in.readLine();
//             if (line == null) {
//                 break;
//             }
            
             //创建Builder
             MessageInfo.Builder builder = MessageInfo.newBuilder();
           
             builder.addBody(Message.Body.newBuilder().setKey("message").setValue("你在干什么?" + "/n").build());
            
             //创建 赋值结束的 Build  并生成 MessageInfo对象
             MessageInfo messageInfo = builder.build();
             //将messageInfo转换为字节
             byte[] messageByte = messageInfo.toByteArray();
            
             //获得此对象的长度
             ChannelBuffer channelBuffer = ChannelBuffers.buffer(messageByte.length);
             //将 获得到的数组写入 channelBuffer中
             channelBuffer.writeBytes(messageByte);
             //发送到服务器端
             for(int i = 0; i < 10;i++){
             lastWriteFuture = channel.write(channelBuffer);
             }
             if (lastWriteFuture != null) {
              lastWriteFuture.awaitUninterruptibly();
          }
        // }
        //     Thread.sleep(50000);
         // Wait until all messages are flushed before closing the channel.
       
            Thread.sleep(50000);
         // Close the connection.  Make sure the close operation ends because
         // all I/O operations are asynchronous in Netty.
         channel.close().awaitUninterruptibly();

         // We should shut down all thread pools here to exit normally.
         // However, it is just fine to call System.exit(0) because we are
         // finished with the business.
         System.exit(0);
}
}



分享到:
评论
7 楼 你好javaword 2014-10-17  
    
6 楼 asialee 2013-10-18  
朋在无锡 写道

可以将Channels使用静态导入的方式:
import static org.jboss.netty.channel.Channels;

的确是这样,但是特别容易忽略,静态导入的位置和方式比较难注意到,我也是一行行对比代码才发现。

是呀,静态导入有这样的问题
5 楼 朋在无锡 2013-10-18  

可以将Channels使用静态导入的方式:
import static org.jboss.netty.channel.Channels;

的确是这样,但是特别容易忽略,静态导入的位置和方式比较难注意到,我也是一行行对比代码才发现。
4 楼 asialee 2013-09-13  
朋在无锡 写道
  ChannelPipeline pipeline = pipeline();
这个漏掉了,对于菜鸟来说影响很大,希望后来者注意,
正确如下:
  ChannelPipeline pipeline =Channels.pipeline();



可以将Channels使用静态导入的方式:
import static org.jboss.netty.channel.Channels;
3 楼 朋在无锡 2013-09-11  
  ChannelPipeline pipeline = pipeline();
这个漏掉了,对于菜鸟来说影响很大,希望后来者注意,
正确如下:
  ChannelPipeline pipeline =Channels.pipeline();

2 楼 asialee 2013-02-20  
博主这块写的挺好的,我完全看了,不过我也写了一些关于编解码器的,请批评指正http://asialee.iteye.com/blog/1769508
1 楼 chxiaowu 2012-06-14  
Good!

相关推荐

    android netty使用demo

    总之,"android netty使用demo"提供了一个基础的Android Netty应用实例,展示了如何在Android环境中使用Netty实现长连接通信。然而,实际开发中,还需要考虑更多的细节和优化,以满足复杂的网络应用场景。

    Netty使用心得,Netty实战

    在深入探讨 Netty 的使用心得和实战应用之前,先来了解一下 Netty 的核心特性。 1. **异步非阻塞 I/O**:Netty 采用基于通道(Channel)和事件处理器(EventLoop)的异步模型,使得它能够高效处理大量并发连接,...

    Netty4 使用

    在本文中,我们将深入探讨Netty 4的使用,包括其核心概念、对象传输、通信封装以及如何利用它来创建高效的服务。 **Netty 4的核心特性** 1. **异步事件驱动**:Netty 使用非阻塞I/O模型,通过EventLoop(事件循环...

    NettyDemo Netty使用实例,对象传递调用

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...掌握Netty的使用,将极大地提升我们在分布式系统、游戏服务器、RPC服务等领域的开发能力。

    Android使用Netty网络框架实践(客户端、服务端)

    本实践将详细介绍如何在Android环境中使用Netty进行客户端和服务端的通信。 首先,我们需要理解Netty的基本概念。Netty的核心是其EventLoopGroup(事件循环组),它负责处理I/O事件,并将它们分发到相应的...

    NettyAndroid,Netty在Android中的使用

    Netty在Android开发中的应用实战系列(二)——— Encoder | Decoder | Handler 的使用:https://azhon.blog.csdn.net/article/details/100831777 Netty在Android开发中的应用实战系列(三)——— 心跳处理 | 断线...

    Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码.rar

    Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码;Netty中使用WebSocket实现服务端与客户端的长连接通信发送消息示例代码

    使用netty进行rtsp服务端开发.zip

    在本文中,我们将深入探讨如何使用Netty进行RTSP(Real Time Streaming Protocol)服务端的开发,以及如何处理H264、H265和AAC格式的流媒体文件。 1. RTSP简介: RTSP是一种应用层协议,主要用于控制实时流传输,如...

    spring+netty+mybatis整合实例

    在IT行业中,Spring、Netty和MyBatis是三个非常重要的框架,它们分别在不同的领域发挥着关键作用。Spring是一个全面的Java应用框架,提供强大的依赖注入、AOP(面向切面编程)以及丰富的功能模块;Netty则是一个高...

    Netty 使用Disruptor机制的处理源代码

    标题 "Netty 使用Disruptor机制的处理源代码" 暗示我们将探讨如何在 Netty 中集成并利用 Disruptor 来优化消息处理流程。Disruptor 的核心是其环形缓冲区(Ring Buffer),这个缓冲区能够在无锁或极少锁的情况下实现...

    Android Netty的使用(结合RxJava)

    在Android上使用Netty,可以利用其非阻塞I/O模型,减少CPU资源消耗,尤其适合处理大量并发连接。 1. **Netty的基本组件**: - **Bootstrap**: 服务器启动引导类,用于配置服务器端的参数。 - **Channel**: 网络...

    使用netty5进行udp网络通讯

    这个小程序使用netty5进行udp网络通讯,客户端有两种,1:用netty5类库发送DatagramPacket和接收 2:直接使用DatagramSocket发送接收DatagramPacket 先运行netty_server的QuoteOfTheMomentServer, 在运行netty_...

    resteasy使用netty

    RestEasy与Netty结合使用,可以构建高性能的RESTful服务,摆脱传统的Servlet容器如Tomcat的依赖。RestEasy是JBoss公司开发的一个Java框架,它实现了JSR 311和JSR 339(Java API for RESTful Web Services)标准,...

    Netty的UDP通信心得

    Netty的UDP通信心得

    Netty主要使用类讲解.docx

    以下是对Netty中一些关键类和使用步骤的详细解释: 1. **Channel** - Channel 是Netty中的核心概念,它代表了网络连接,可以是TCP、UDP、Unix Domain Socket等。Channel 对象用于读写数据,例如Nio...

    netty的入门经典例子的使用

    这个“netty的入门经典例子的使用”很可能是为了引导初学者了解和掌握 Netty 的基本用法和核心概念。下面将详细阐述 Netty 的基础知识及其在实际应用中的关键点。 首先,Netty 提供了一个高度可定制的事件驱动的...

    netty部分参考个人心得

    * 互联网行业:分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为一步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。 * 游戏行业:手游服务端和大型的网络游戏,...

    Netty使用与实现.pdf

    ### Netty的使用与实现详解 #### 一、Netty简介 Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器与客户端。它简化了网络程序的开发过程,提供了丰富的特性支持,使得...

    Netty使用初步

    《Netty使用初步》 Netty是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。它广泛应用于各种分布式系统、云计算平台以及游戏服务器等场景。本篇文章将深入探讨Netty...

    Netty实战 电子版.pdf_java_netty_服务器_

    4. **零拷贝**:Netty通过使用FileRegion实现零拷贝,减少了CPU对数据的拷贝次数,提升了性能。 5. **强大的协议支持**:Netty内置了多种常见网络协议的支持,如HTTP、WebSocket、FTP、SMTP等,以及自定义协议的...

Global site tag (gtag.js) - Google Analytics