`

Netty基础

 
阅读更多

熟悉使用netty,写博客备忘

 

1.首先写服务端代码,依赖的类后面体现

public class MessageServer {

public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageServer start..");
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

// 与下面的addLast操作选择一个即可
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());

//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageServerHandler());

// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress("localhost",9550));
}
}

2.相应的客户端:

public class MessageClient {

public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageClient start..");
start();
}

private static void start() {
String host = "127.0.0.1";
int port = 9550;
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
// 不能既有MessageServerPipelineFactory,又有bootstrap.getPipeline().addLast()这个方法
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());

//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageClientHandler());

// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails
future.getChannel().getCloseFuture().awaitUninterruptibly();

// Shut down thread pools to exit
future.getChannel().write("--------Shut down thread pools to exit: future getChannel");

//release resource
bootstrap.releaseExternalResources();
}
}

3.服务端的handler

public class MessageServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(MessageServerHandler.class);

private final AtomicLong transferredBytes = new AtomicLong();

@Override
public void messageReceived(ChannelHandlerContext cxt, MessageEvent event){
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// /ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
transferredBytes.addAndGet(((ChannelBuffer) event.getMessage()).readableBytes());
System.out.println("MessageServerHandler : messageReceived, length = "
+ ((ChannelBuffer) event.getMessage()).readableBytes()
+ transferredBytes.byteValue());
event.getChannel().write(event.getMessage());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
}

@Override
public void exceptionCaught(ChannelHandlerContext cxt, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
logger.log(Level.WARNING, "******ServerHandler: exceptionCaught", event.getCause());
System.out.println("ServerHandler: exceptionCaught ******" + event.getCause().getLocalizedMessage());
event.getChannel().close();
}

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelOpen, channel id = " + event.getChannel().getId());
super.channelOpen(ctx, event);
}

@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelConnected, context name = " + ctx.getName());
super.channelConnected(ctx, e);
}


}
4.客户端的handler

public class MessageClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(MessageClientHandler.class);

//private ChannelBuffer responseBuffer = ChannelBuffers.dynamicBuffer();
private static byte[] content = {'A','2','3','4','5','7','8','9','e','a','b','c','d','f','g','h','i','j'};

@Override
public void channelConnected(ChannelHandlerContext cxt, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
//System.out.println("当连接到服务器的时候,就开始发送256字节的字符串");
logger.info("channelConnected: client try to connect server...");
//当messageReceived接收到服务器的消息时,又把消息发送给服务器
event.getChannel().write(nextMessage());//this.getTestString(256)
super.channelConnected(cxt, event);
}

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
//System.out.println("Client: messageReceived , buffer.readableBytes = " + buffer.readableBytes());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
event.getChannel().write(event.getMessage());
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
//logger.log(Level.WARNING, "CLIENT: exceptionCaught " , event.getCause());
System.out.println(event.getCause().getMessage());
ctx.getChannel().close();
}

public static String getTestString(int size) {
ConstantStep.getInsatnce().printStep();
StringBuilder sb = new StringBuilder(20);
for(int i=0; i<size; i++) {
sb.append("A");
}
return sb.toString();
}

public static ChannelBuffer nextMessage() {
ConstantStep.getInsatnce().printStep();
return ChannelBuffers.wrappedBuffer(content);
}
}

5.MessageServerPipelineFactory:

public class MessageServerPipelineFactory implements ChannelPipelineFactory {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//在链条 加上各种处理器 ,注意顺序
pipeline.addLast("handler", new MessageServerHandler());
pipeline.addLast("decoder", new ServerMessageDecoder());
pipeline.addLast("encoder", new ServerMessageEncoder());

return pipeline;
}

}

6.类似的MessageClientPipelineFactory

public class MessageClientPipelineFactory implements ChannelPipelineFactory {

@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler", new MessageClientHandler());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());

return pipeline;
}

}

7.ServerMessageDecoder:

public class ServerMessageDecoder extends FrameDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("ServerDDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}

8.ServerMessageEncoder:

public class ServerMessageEncoder extends OneToOneDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("Server EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}

9.MessageEncoder:

public class MessageEncoder extends OneToOneDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}

10:MessageDecoder:

public class MessageDecoder extends FrameDecoder {

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);//测试用
System.out.println("DDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}

11:ConstantStep

public class ConstantStep {

private int step = 0;

private static ConstantStep instance = new ConstantStep();

private ConstantStep() {

}

public synchronized static ConstantStep getInsatnce(){//
return instance;
}

public synchronized void printStep() {
System.out.println(" now step : " + step++);
}

}

分享到:
评论

相关推荐

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    Netty基础,用于学习Netty,参考黑马程序员的netty教程

    unit3.2.rar netty基础类netty基础类netty基础类netty基础类netty基础类

    本节我们将深入探讨Netty的基础类和核心概念,帮助你构建坚实的Netty知识体系。 1. **ByteBuf**: Netty中的核心数据结构是`ByteBuf`,它是Java NIO `ByteBuffer`的增强版。`ByteBuf`提供了更高效的内存管理,支持...

    Netty基础应用实战.docx

    本文档是Netty的基础应用实战教程,涵盖了从环境设置到高级特性的全面讲解。 首先,Netty 的核心特性是其异步和数据驱动的模式,这种模式允许在高并发场景下高效地处理I/O事件。Netty 的设计使得网络编程变得更加...

    netty基础说明-结构分析-网关了解

    java-netty图,分析选择器 java-netty图,分析IO java-netty图,分析管道 java-netty图,分析缓存

    Netty 基础知识点学习汇总

    设计基础知识,初学者可以参照

    netty官网学习手册中文版

    这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...

    Netty权威指南第二版

    内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...

    Netty权威指南 第2版 带书签目录 完整版

    内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...

    netty-socketio api接口文档.7z

    1. **Netty基础**:Netty的核心是它的Channel和EventLoop系统。Channel是网络连接的抽象,负责读写数据。EventLoop是事件处理循环,处理I/O事件并调度任务。了解这些基础概念是理解和使用Netty的关键。 2. **...

    Netty权威指南 第2版 带书签目录 高清完整版.pdf

    内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...

    Netty权威指南 第2版

    netty权威指南 第二版是一本Netty架构学习指南,由李林锋...通过阅读netty权威指南 第二版,读者不仅能够掌握Netty基础功能的使用和开发,更能够掌握Netty核心类库的原理和使用约束,从而在实际工作中更好地使用Netty。

    Netty权威指南(第2版)

    内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...

    netty学习教程

    1. **Netty基础** - Netty架构:理解Netty的核心组件,如EventLoop、Channel、Pipeline和Buffer。 - 异步事件驱动:讲解Netty如何通过非阻塞I/O实现高效的网络通信。 - ByteBuf:Netty自定义的字节缓冲区,比Java...

    netty1.zip

    Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...同时,配合上文提到的 Netty 基础概念,这将极大地提升你在网络编程和高性能服务器开发方面的技能。

    netty快速入门教程9集 共12集

    1. **Netty基础架构**:了解Netty的基本组件,如BossGroup、WorkerGroup、Channel、EventLoop等,它们如何协同工作以实现高效的网络通信。 2. **ByteBuf与缓冲区管理**:ByteBuf是Netty中的缓冲区实现,它提供了一...

    netty的视频90集

    1. **Netty基础**: - Netty的基本概念:了解Netty是什么,它如何简化Java网络编程,以及它的核心组件,如Channel、EventLoop、Bootstrap等。 - 异步模型:Netty采用NIO(非阻塞I/O)模型,通过事件驱动方式实现高...

    netty案例.zip

    1. **Netty基础架构**: Netty采用了Reactor模式,它是一种处理并发I/O事件的设计模式,主要由Selector(选择器)、ServerBootstrap(引导类)和Channel(通道)等组件构成。在这个案例中,你会看到ServerBootstrap...

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    1. **Netty基础架构**: Netty的核心组件包括Bootstrap、ServerBootstrap、Channel、ChannelHandler和EventLoopGroup。Bootstrap和ServerBootstrap分别用于创建客户端和服务端连接;Channel代表网络连接;...

    Netty 入门与实战:仿写微信 IM 即时通讯系统.zip

    1. **Netty基础**:介绍Netty的基本架构,如BossGroup、WorkerGroup、Channel、Pipeline等核心概念,以及它们在处理网络通信中的作用。 2. **Netty ChannelHandler**:讲解ChannelHandler的实现原理,如何编写...

Global site tag (gtag.js) - Google Analytics