熟悉使用netty,写博客备忘
1.首先写服务端代码,依赖的类后面体现
public class MessageServer {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageServer start..");
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// 与下面的addLast操作选择一个即可
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageServerHandler());
// Bind and start to accept incoming connections
bootstrap.bind(new InetSocketAddress("localhost",9550));
}
}
2.相应的客户端:
public class MessageClient {
public static void main(String[] args) {
ConstantStep.getInsatnce().printStep();
System.out.println("MessageClient start..");
start();
}
private static void start() {
String host = "127.0.0.1";
int port = 9550;
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
// 不能既有MessageServerPipelineFactory,又有bootstrap.getPipeline().addLast()这个方法
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
//bootstrap.getPipeline().addLast("decoder", new MessageDecoder());
//bootstrap.getPipeline().addLast("encoder", new MessageEncoder());
//bootstrap.getPipeline().addLast("handler", new MessageClientHandler());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit
future.getChannel().write("--------Shut down thread pools to exit: future getChannel");
//release resource
bootstrap.releaseExternalResources();
}
}
3.服务端的handler
public class MessageServerHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageServerHandler.class);
private final AtomicLong transferredBytes = new AtomicLong();
@Override
public void messageReceived(ChannelHandlerContext cxt, MessageEvent event){
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
// /ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
transferredBytes.addAndGet(((ChannelBuffer) event.getMessage()).readableBytes());
System.out.println("MessageServerHandler : messageReceived, length = "
+ ((ChannelBuffer) event.getMessage()).readableBytes()
+ transferredBytes.byteValue());
event.getChannel().write(event.getMessage());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
}
@Override
public void exceptionCaught(ChannelHandlerContext cxt, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
logger.log(Level.WARNING, "******ServerHandler: exceptionCaught", event.getCause());
System.out.println("ServerHandler: exceptionCaught ******" + event.getCause().getLocalizedMessage());
event.getChannel().close();
}
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelOpen, channel id = " + event.getChannel().getId());
super.channelOpen(ctx, event);
}
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
ConstantStep.getInsatnce().printStep();
System.out.println("ServerHandler........channelConnected, context name = " + ctx.getName());
super.channelConnected(ctx, e);
}
}
4.客户端的handler
public class MessageClientHandler extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(MessageClientHandler.class);
//private ChannelBuffer responseBuffer = ChannelBuffers.dynamicBuffer();
private static byte[] content = {'A','2','3','4','5','7','8','9','e','a','b','c','d','f','g','h','i','j'};
@Override
public void channelConnected(ChannelHandlerContext cxt, ChannelStateEvent event)
throws Exception {
ConstantStep.getInsatnce().printStep();
//System.out.println("当连接到服务器的时候,就开始发送256字节的字符串");
logger.info("channelConnected: client try to connect server...");
//当messageReceived接收到服务器的消息时,又把消息发送给服务器
event.getChannel().write(nextMessage());//this.getTestString(256)
super.channelConnected(cxt, event);
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
ConstantStep.getInsatnce().printStep();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//ChannelBuffer buffer = (ChannelBuffer) event.getMessage();
//System.out.println("Client: messageReceived , buffer.readableBytes = " + buffer.readableBytes());
System.out.println("Client: messageReceived ," + event.getMessage()
+ ",time = " + System.currentTimeMillis());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
event.getChannel().write(event.getMessage());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
ConstantStep.getInsatnce().printStep();
//logger.log(Level.WARNING, "CLIENT: exceptionCaught " , event.getCause());
System.out.println(event.getCause().getMessage());
ctx.getChannel().close();
}
public static String getTestString(int size) {
ConstantStep.getInsatnce().printStep();
StringBuilder sb = new StringBuilder(20);
for(int i=0; i<size; i++) {
sb.append("A");
}
return sb.toString();
}
public static ChannelBuffer nextMessage() {
ConstantStep.getInsatnce().printStep();
return ChannelBuffers.wrappedBuffer(content);
}
}
5.MessageServerPipelineFactory:
public class MessageServerPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
//在链条 加上各种处理器 ,注意顺序
pipeline.addLast("handler", new MessageServerHandler());
pipeline.addLast("decoder", new ServerMessageDecoder());
pipeline.addLast("encoder", new ServerMessageEncoder());
return pipeline;
}
}
6.类似的MessageClientPipelineFactory
public class MessageClientPipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("handler", new MessageClientHandler());
pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
return pipeline;
}
}
7.ServerMessageDecoder:
public class ServerMessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("ServerDDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
8.ServerMessageEncoder:
public class ServerMessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("Server EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
9.MessageEncoder:
public class MessageEncoder extends OneToOneDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);
System.out.println("EEEEE: MessageEncoder: decode() ,msg : "
+ msg.getClass() + ", info = " + msg
+ ",time = " + System.currentTimeMillis());
if(!(msg instanceof String)) {
return msg;
}
//开始编码:将要发送的字符串转换成字节,数据包头是一个4字节的int,后面就是字符串byte
String str = (String)msg;
byte[] bytes = str.getBytes();
int length = bytes.length;
ChannelBuffer cb = ChannelBuffers.dynamicBuffer();
cb.writeInt(length);
cb.writeBytes(bytes);
return cb;
//return msg;
}
}
10:MessageDecoder:
public class MessageDecoder extends FrameDecoder {
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ConstantStep.getInsatnce().printStep();
Thread.sleep(100);//测试用
System.out.println("DDDDD: MessageDecoder: decode() ,msg : "
+ buffer.getClass()
+ ",time = " + System.currentTimeMillis());
//对应前面的编码,先读取4字节的int.如发现不够4字节,则直接返回null,累积到下一次读取。
//if(buffer.readableBytes()<4) return null;
//int dataLength = buffer.getInt(buffer.readerIndex());
//如果发现读取的数据长度不够,则累积到下一次读取
//if(buffer.readableBytes() < (dataLength + 4)) return null;
//buffer.skipBytes(4);
byte[] bytes = new byte[buffer.readableBytes()];
buffer.readBytes(bytes);
return new String(bytes);
}
}
11:ConstantStep
public class ConstantStep {
private int step = 0;
private static ConstantStep instance = new ConstantStep();
private ConstantStep() {
}
public synchronized static ConstantStep getInsatnce(){//
return instance;
}
public synchronized void printStep() {
System.out.println(" now step : " + step++);
}
}
相关推荐
Netty基础,用于学习Netty,参考黑马程序员的netty教程
本节我们将深入探讨Netty的基础类和核心概念,帮助你构建坚实的Netty知识体系。 1. **ByteBuf**: Netty中的核心数据结构是`ByteBuf`,它是Java NIO `ByteBuffer`的增强版。`ByteBuf`提供了更高效的内存管理,支持...
本文档是Netty的基础应用实战教程,涵盖了从环境设置到高级特性的全面讲解。 首先,Netty 的核心特性是其异步和数据驱动的模式,这种模式允许在高并发场景下高效地处理I/O事件。Netty 的设计使得网络编程变得更加...
java-netty图,分析选择器 java-netty图,分析IO java-netty图,分析管道 java-netty图,分析缓存
设计基础知识,初学者可以参照
这个“netty官网学习手册中文版”针对的是Netty的3.1版本,虽然现在的Netty已经发展到了5.x版本,但3.1版本的知识仍然具有历史参考价值,特别是对于那些初次接触或需要理解Netty基础概念的开发者来说。 1. **Netty...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
1. **Netty基础**:Netty的核心是它的Channel和EventLoop系统。Channel是网络连接的抽象,负责读写数据。EventLoop是事件处理循环,处理I/O事件并调度任务。了解这些基础概念是理解和使用Netty的关键。 2. **...
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
netty权威指南 第二版是一本Netty架构学习指南,由李林锋...通过阅读netty权威指南 第二版,读者不仅能够掌握Netty基础功能的使用和开发,更能够掌握Netty核心类库的原理和使用约束,从而在实际工作中更好地使用Netty。
内容不仅包含Java NIO入门知识、Netty 的基础功能开发指导、编解码框架定制等,还包括私有协议栈定制和开发、Netty 核心类库源码分析,以及Netty 的架构剖析。 《Netty 权威指南(第2 版)》适合架构师、设计师、...
1. **Netty基础** - Netty架构:理解Netty的核心组件,如EventLoop、Channel、Pipeline和Buffer。 - 异步事件驱动:讲解Netty如何通过非阻塞I/O实现高效的网络通信。 - ByteBuf:Netty自定义的字节缓冲区,比Java...
Netty 是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。...同时,配合上文提到的 Netty 基础概念,这将极大地提升你在网络编程和高性能服务器开发方面的技能。
1. **Netty基础架构**:了解Netty的基本组件,如BossGroup、WorkerGroup、Channel、EventLoop等,它们如何协同工作以实现高效的网络通信。 2. **ByteBuf与缓冲区管理**:ByteBuf是Netty中的缓冲区实现,它提供了一...
1. **Netty基础**: - Netty的基本概念:了解Netty是什么,它如何简化Java网络编程,以及它的核心组件,如Channel、EventLoop、Bootstrap等。 - 异步模型:Netty采用NIO(非阻塞I/O)模型,通过事件驱动方式实现高...
1. **Netty基础架构**: Netty采用了Reactor模式,它是一种处理并发I/O事件的设计模式,主要由Selector(选择器)、ServerBootstrap(引导类)和Channel(通道)等组件构成。在这个案例中,你会看到ServerBootstrap...
1. **Netty基础架构**: Netty的核心组件包括Bootstrap、ServerBootstrap、Channel、ChannelHandler和EventLoopGroup。Bootstrap和ServerBootstrap分别用于创建客户端和服务端连接;Channel代表网络连接;...
1. **Netty基础**:介绍Netty的基本架构,如BossGroup、WorkerGroup、Channel、Pipeline等核心概念,以及它们在处理网络通信中的作用。 2. **Netty ChannelHandler**:讲解ChannelHandler的实现原理,如何编写...