- 浏览: 63162 次
- 性别:
- 来自: 上海
文章分类
最新评论
-
u014549257:
...
Apache Mina: StreamIoHandler传输文件处理 -
至尊包:
想问一下,这个官网的列子如果要兼容3.0以下的版本要怎么处理? ...
Swipe Views (水平分页)
本例子根据mina自带的例子:sumup改写。
1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。
2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。
3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。
2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
AddMessage.java
ResultMessage.java
2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
ResultMessageEncoder.java
3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
ResultMessageDecoder.java
4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
5) 创建server端服务和业务处理ServerIoHandler
Server.java
ServerIoHandler.java
6) 创建Client端连接和业务处理ClientIoHandler
Client.java
ClientIoHandler.java
7. Constants.java
1. 基本原理:
1) 客户端向服务端发送AddMessage对象时,先根据AddMessageEncoder编码, 当服务端接收到AddMessage后,根据自定义的AddMessageDecode解码数据。
2) 服务端数据解码后,生成回复对象ResultMessage,并对该对象通过ResultMessageEncoder进行编码,并发送到客户端。 客户端接收ResultMessage后,根据ResultMessageDecoder解码,并将数据显示出来。
3) AddMessageEncoder,ResultMessageEncoder,AddMessageDecoder,ResultMessageDecoder自定义编码和解码的方法。
2. 代码:
1) 创建AddMessage和ResultMessage对象
AbstractMessage.java
public abstract class AbstractMessage implements Serializable { private int sequence; public int getSequence() { return sequence; } public void setSequence(int sequence) { this.sequence = sequence; } }
AddMessage.java
public class AddMessage extends AbstractMessage { private static final long serialVersionUID = -735205238699949292L; private int value; public AddMessage(){ } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public String toString() { return "AddMessage [value=" + value + ", getSequence()=" + getSequence() + "]"; } }
ResultMessage.java
public class ResultMessage extends AbstractMessage { private static final long serialVersionUID = 7431899532938146290L; private boolean ok; private int value; public ResultMessage(){ } public boolean isOk() { return ok; } public void setOk(boolean ok) { this.ok = ok; } public int getValue() { return value; } public void setValue(int value) { this.value = value; } @Override public String toString() { return "ResultMessage [ok=" + ok + ", value=" + value + ", getSequence()=" + getSequence() + "]"; } }
2) 创建AddMessageEncoder和ResultMessageEncoder
AddMessageEncoder.java
public class AddMessageEncoder<T extends AddMessage> implements MessageEncoder<T> { @Override public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(16); buf.setAutoExpand(true); // Enable auto-expand for easier encoding // Encode a header buf.putInt(message.getSequence()); buf.putInt(message.getValue()); buf.flip(); out.write(buf); } }
ResultMessageEncoder.java
public class ResultMessageEncoder<T extends ResultMessage> implements MessageEncoder<T> { @Override public void encode(IoSession session, T message, ProtocolEncoderOutput out) throws Exception { IoBuffer buf = IoBuffer.allocate(16); buf.setAutoExpand(true); // Enable auto-expand for easier encoding if(message.isOk()){ buf.putShort((short) Constants.RESULT_OK); buf.putInt(message.getSequence()); buf.putInt(message.getValue()); }else{ buf.putShort((short)Constants.RESULT_ERROR); buf.putInt(message.getSequence()); buf.putInt(0); } buf.flip(); out.write(buf); } }
3) 创建AddMessageDecoder和ResultMessageDecoder
AddMessageDecoder.java
public class AddMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } // Return NOT_OK if not matches. return MessageDecoderResult.OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int sequence = in.getInt(); int value = in.getInt(); AddMessage m = new AddMessage(); m.setSequence(sequence); m.setValue(value); out.write(m); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }
ResultMessageDecoder.java
public class ResultMessageDecoder implements MessageDecoder { @Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int code = in.getShort(); if(code==Constants.RESULT_OK){ return MessageDecoderResult.OK; } // Return NOT_OK if not matches. return MessageDecoderResult.NOT_OK; } @Override public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } int code = in.getShort(); int sequence = in.getInt(); int value = in.getInt(); ResultMessage m = new ResultMessage(); if(code==Constants.RESULT_OK){ m.setOk(true); m.setSequence(sequence); m.setValue(value); }else{ m.setOk(false); } out.write(m); return MessageDecoderResult.OK; } @Override public void finishDecode(IoSession session, ProtocolDecoderOutput out) throws Exception { // TODO Auto-generated method stub } }
4) 创建CodecFactory,注册AddMessageEncoder, ResultMessageEncoder, AddMessageDecoder, ResultMessageDecoder.
public class DemoProtocolCodecFactory extends DemuxingProtocolCodecFactory { public DemoProtocolCodecFactory(boolean isServer){ if(isServer){ super.addMessageEncoder(ResultMessage.class, ResultMessageEncoder.class); super.addMessageDecoder(AddMessageDecoder.class); }else{ super.addMessageEncoder(AddMessage.class, AddMessageEncoder.class); super.addMessageDecoder(ResultMessageDecoder.class); } } }
5) 创建server端服务和业务处理ServerIoHandler
Server.java
public class Server { public void init() throws IOException{ IoAcceptor acceptor = new NioSocketAcceptor(); acceptor.getFilterChain().addLast("logger", new LoggingFilter()); acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new DemoProtocolCodecFactory(Constants.IS_SERVER))); acceptor.setHandler(new ServerIoHandler()); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10); acceptor.setDefaultLocalAddress(new InetSocketAddress(Constants.PORT)); acceptor.bind();// 启动监听 } public static void main(String[] args) throws IOException { Server server = new Server(); server.init(); } }
ServerIoHandler.java
public class ServerIoHandler extends IoHandlerAdapter { private static final String SUM_KEY = "sum"; @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { System.out.println(cause.getMessage()); cause.printStackTrace(); session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { AddMessage add = (AddMessage) message; int sum = ((Integer)session.getAttribute(SUM_KEY)).intValue(); int value = add.getValue(); long total = (long) sum + value; if(total>Integer.MAX_VALUE || total<Integer.MIN_VALUE){ ResultMessage result = new ResultMessage(); result.setSequence(add.getSequence()); result.setOk(false); session.write(result); }else{ sum = (int) total; session.setAttribute(SUM_KEY, sum); ResultMessage result = new ResultMessage(); result.setSequence(add.getSequence()); result.setOk(true); result.setValue(sum); session.write(result); } //System.out.println("total=" + total); } @Override public void sessionIdle(IoSession session, IdleStatus status) throws Exception { session.close(true); } @Override public void sessionOpened(IoSession session) throws Exception { session.setAttribute(SUM_KEY, new Integer(0)); } }
6) 创建Client端连接和业务处理ClientIoHandler
Client.java
public class Client { public void init() throws InterruptedException{ NioSocketConnector connector = new NioSocketConnector(); // Configure the service. connector.setConnectTimeoutMillis(Constants.CONNECT_TIMEOUT); connector.getFilterChain().addLast("codec",new ProtocolCodecFilter(new DemoProtocolCodecFactory(!Constants.IS_SERVER))); connector.getFilterChain().addLast("logger", new LoggingFilter()); connector.setHandler(new ClientIoHandler()); IoSession session; for (;;) { try { ConnectFuture future = connector.connect(new InetSocketAddress(Constants.HOSTNAME, Constants.PORT)); future.awaitUninterruptibly(); session = future.getSession(); break; } catch (RuntimeIoException e) { System.err.println("Failed to connect."); e.printStackTrace(); Thread.sleep(5000); } } // wait until the summation is done session.getCloseFuture().awaitUninterruptibly(); connector.dispose(); } /** * @param args * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Client client = new Client(); client.init(); } }
ClientIoHandler.java
public class ClientIoHandler extends IoHandlerAdapter { private List<Integer> values = new ArrayList<Integer>(); @Override public void exceptionCaught(IoSession session, Throwable cause) throws Exception { session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { ResultMessage rm = (ResultMessage) message; if (rm.isOk()) { if (rm.getSequence() == values.size() - 1) { System.out.println("the sum is " + rm.getValue()); session.close(true); //finished = true; } } else { System.out.println("Server error, disconnecting..."); session.close(true); //finished = true; } } @Override public void sessionOpened(IoSession session) throws Exception { init(); for (int i = 0; i < values.size(); i++) { int _value = ((Integer) values.get(i)).intValue(); AddMessage m = new AddMessage(); m.setSequence(i); m.setValue(_value); session.write(m); } } private void init(){ for(int i=0;i<3;i++){ int _value = i*100 + 1; values.add(new Integer(_value)); } } }
7. Constants.java
public class Constants { public final static int PORT = 9123; public static final String HOSTNAME = "localhost"; public static final long CONNECT_TIMEOUT = 30*1000L; // 30 seconds public final static boolean IS_SERVER = true; public static final int RESULT_OK = 0; public static final int RESULT_ERROR = 1; }
- minaServer.rar (60 KB)
- 下载次数: 4
相关推荐
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class EchoServer { private IoAcceptor acceptor = new ...
### Apache MINA线程模型配置详解 #### 一、线程模型配置介绍 Apache MINA 是一个用于构建网络应用程序的高性能、高可靠性的框架。它提供了丰富的功能来简化网络编程,包括TCP/IP 和 UDP/IP 协议的支持。线程模型...
4. **Protocol Codec**:编码和解码是网络通信中的重要环节,Mina提供了ProtocolCodec接口,允许开发者自定义协议的编码和解码规则。 5. **服务端与客户端构建**:学习如何使用Mina API创建服务端和客户端,监听...
实现这个功能,我们需要创建一个新的编码器类,继承自Mina提供的基础编码器,如`org.apache.mina.filter.codec.ProtocolEncoder`或`org.apache.mina.filter.codec.ProtocolCodecFilter`。 在编码器的实现中,我们...
Apache Mina是一个开源项目,由Apache软件基金会维护,它是一个网络通信框架,专注于提供高性能、高度可扩展的网络应用程序开发工具。本篇文章将带你深入理解Apache Mina的基础知识,包括其设计理念、核心组件以及...
Apache Mina 框架是一个强大的网络通信应用框架,它主要针对基于TCP/IP和UDP/IP的协议栈,同时也支持Java对象序列化和其他通信方式。Mina 的核心设计目标是帮助开发者快速构建高性能、高可扩展性的网络应用。它采用...
4. **过滤器机制**:Mina的过滤器链允许开发者在数据传输过程中添加自定义逻辑,如数据加密、身份验证、日志记录等。过滤器可以按顺序或条件执行,增强了系统的可扩展性和灵活性。 5. **强大的会话管理**:Mina的...
### Apache Mina 详解 #### 一、Apache Mina 概述 Apache Mina 是一个高性能、可扩展的网络通信框架,它简化了基于 TCP/IP 和 UDP/IP 协议栈的网络应用程序的开发过程。Mina 提供了一个事件驱动、异步的操作模式,...
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MyServer { public static void main(String[] args) { ...
Apache Mina是一个开源框架,主要用于构建高性能、高可用性的网络通信应用。在Mina 1.1.7版本中,提供的示例源码是学习和理解Mina框架工作原理及其实现各种网络协议的重要资源。这个压缩包"apache.mina.example"包含...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高度可扩展且高性能的网络应用程序框架,主要用于简化网络编程,尤其是TCP/IP和UDP/IP协议的应用。MINA的设计目标是为开发者提供一个...
Apache Mina是一个开源框架,主要用于简化网络应用的开发,尤其是基于TCP和UDP协议的应用。它提供了高度可扩展的事件驱动模型,使得开发者可以专注于业务逻辑,而无需关注底层的网络通信细节。本入门实例将带你了解...
import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.filter.executor.OrderedThreadPoolExecutor; import org.apache.mina....
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class ...
import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class MinaServer { ...
import org.apache.mina.filter.codec.textline.TextLineCodecFactory; import org.apache.mina.transport.socket.nio.NioSocketAcceptor; public class Demo1Server { private static final org.slf4j.Logger ...
4. **丰富的API**:Mina提供了一套全面的API,包括Filter Chain(过滤器链)、Session(会话)和Protocol Codec(协议编解码器)等接口,使得开发者可以通过组合不同的组件来实现复杂的功能。 5. **模块化设计**:...
Apache MINA(Multipurpose Infrastructure for Network Applications)是一个高性能、异步事件驱动的网络应用程序框架,主要用于简化开发高质量的网络服务。MINA的目标是为开发者提供一个简单易用但功能强大的库,...