浏览 4902 次
锁定老帖子 主题:as3和java nio通信的完美例子
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-12-30
package astest; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; /** * ByteArray数据包 * @author zkpursuit */ public class Packet { private ByteBuffer buff; private int length; /** * 默认构造方法 * 开辟2M的缓存区 */ public Packet(){ this(1024); } public Packet(int size){ length=size; buff = ByteBuffer.allocate(length); } public Packet(ByteBuffer buffer){ buff=buffer; length=buffer.limit(); } //写入数据 public void writeChar(char value){ buff.putChar(value); } public void writeByte(byte value){ buff.put(value); } public void writeFloat(float value){ buff.putFloat(value); } public void writeLong(long value){ buff.putLong(value); } public void writeDouble(double value){ buff.putDouble(value); } public void writeInt(int value){ buff.putInt(value); } public void writeShort(short value){ buff.putShort(value); } public void writeBytes(byte[] bytes){ buff.put(bytes); } public void writeString(String str){ byte[] str_bytes=str.getBytes(); short len=(short)(str_bytes.length); writeShort(len); writeBytes(str_bytes); } public void writeString(String str,String charset){ try { byte[] str_bytes=str.getBytes(charset); short len=(short)(str_bytes.length); writeShort(len); writeBytes(str_bytes); } catch (UnsupportedEncodingException e) { System.out.println("不能识别的字符编码"); e.printStackTrace(); System.exit(0); } } //读取数据 public char readChar(){ return buff.getChar(); } public byte readByte(){ return buff.get(); } public float readFloat(){ return buff.getFloat(); } public long readLong(){ return buff.getLong(); } public double readDouble(){ return buff.getFloat(); } public int readInt(){ return buff.getInt(); } public short readShort(){ return buff.getShort(); } public String readString(){ short len=buff.getShort(); byte[] _bytes=new byte[len]; buff.get(_bytes, 0, len); return new String(_bytes); } public String readString(String charset){ short len=buff.getShort(); byte[] _bytes=new byte[len]; buff.get(_bytes, 0, len); try { return new String(_bytes,charset); } catch (UnsupportedEncodingException e) { System.out.println("不能识别的字符编码"); e.printStackTrace(); System.exit(0); } return new String(_bytes); } public ByteBuffer byteBuffer(){ return buff; } public ByteBuffer pack(){ int l=length(); ByteBuffer buffer=ByteBuffer.allocate(l); if(position()>0){ flip(); } buffer.put(array(), 0, l); buffer.flip(); return buffer; } public byte[] array(){ return buff.array(); } public int position(){ return buff.position(); } public void flip(){ buff.flip(); } public void clear(){ buff.clear(); length=0; } /** * 实际存在有用数据的长度 * @return */ public int length(){ return length-buff.remaining(); } /** * 预定义的长度 * @return */ public int totalSize(){ return length; } public void outInfo(byte[] bytes){ for(int i=0;i<bytes.length;i++){ System.out.println("---------"+bytes[i]); } } } package astest; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.nio.charset.CharsetDecoder; import java.nio.charset.CharsetEncoder; import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; /** * * @author Jeff * */ public class NIOServer { private static int BLOCK = 1024; private static String name = ""; public Selector selector; protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK); protected CharsetDecoder decoder; public static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder(); public HashMap<Integer,SelectionKey> rooms=new HashMap<Integer,SelectionKey>(); public List<Integer> ids=new ArrayList<Integer>(); private HashMap<Integer,ICommand> commands=new HashMap<Integer,ICommand>(); public final int CONNECT=1000; public final int SELECT_ROOM=1005; public final int START=1010; public final int RECEIVE_DATA=1015; public final int CONNECT_SUCCESS=2000; public final int SEND_DATA=2005; public NIOServer(int port) throws IOException { selector = this.getSelector(port); Charset charset = Charset.forName("GB2312"); decoder = charset.newDecoder(); } public void registerCommand(int commandID,ICommand command){ if(!commands.containsKey(commandID)){ System.out.println("registeCommand "+commandID+" , "+command); commands.put(commandID, command); } } // 获取Selector protected Selector getSelector(int port) throws IOException { ServerSocketChannel server = ServerSocketChannel.open(); Selector selector = Selector.open(); server.socket().bind(new InetSocketAddress(port)); server.configureBlocking(false); server.register(selector, SelectionKey.OP_ACCEPT); return selector; } // 监听端口 public void listen() { try { while(true){ int num=selector.select(); Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); process(key); } } } catch (IOException e) { e.printStackTrace(); } } // 处理事件 protected void process(SelectionKey key) throws IOException { if (key.isAcceptable()) { // 接收请求 ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); // 设置非阻塞模式 channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 读信息 read(key); //clientBuffer.clear(); } else if (key.isWritable()) { // 写事件 if(key.isValid()){ //SocketChannel channel = (SocketChannel) key.channel(); Packet p=new Packet(100); p.writeInt(300); p.writeInt(400); p.writeString("野草工革革", "utf-8"); send(key,p); //int attachment=(Integer)key.attachment(); } } } /** * 读取数据 * @param channel * @throws IOException */ public void read(SelectionKey key) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); int count = channel.read(clientBuffer); if(count>0){ clientBuffer.flip(); Packet packet=new Packet(clientBuffer); //读取包长 int data_len=packet.readInt(); //读取命令号 int command=packet.readShort(); if(commands.containsKey(command)){ ICommand comm=commands.get(command); int result=comm.execute(this, channel, packet); key.attach(result); //System.out.println("attachment "+key.attachment().getClass()); } clientBuffer.clear(); } } /** * 发送数据 * @param channel * @param packet * @throws IOException */ public int send(SelectionKey key,Packet packet) throws IOException{ return send(key,packet.byteBuffer()); } /** * 发送数据 * @param key * @param buffer * @return 发出Bytes的总长度 * @throws IOException */ public int send(SelectionKey key,ByteBuffer buffer) throws IOException{ SocketChannel channel = (SocketChannel) key.channel(); //发送数据的实际长度 int dataLen=buffer.limit()-buffer.remaining(); if(buffer.position()>0){ buffer.flip(); } //发送的bytes,4为数据包的长度信息,为int型,占用4个字节 ByteBuffer bts=ByteBuffer.allocate(dataLen+4); //写入数据包的长度 bts.putInt(dataLen); //写入数据内容 bts.put(buffer); if(bts.position()>0){ bts.flip(); } int l=channel.write(bts); bts.clear(); buffer.clear(); //注册读事件 channel.register(selector, SelectionKey.OP_READ); //注销写事件 key.interestOps(key.interestOps()&~SelectionKey.OP_WRITE); return l; } public static void main(String[] args) { int port = 8088; try { NIOServer server = new NIOServer(port); server.registerCommand(1000,new ConnectCommand()); System.out.println("listening on " + port); server.listen(); } catch (IOException e) { e.printStackTrace(); } } } package astest; import java.nio.channels.SocketChannel; public interface ICommand { public int execute(NIOServer server,SocketChannel channel,Packet packet); public void write(); } package astest; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.Iterator; public class ConnectCommand implements ICommand { public int execute(NIOServer server,SocketChannel channel,Packet packet){ int playerId=packet.readInt(); Iterator<Integer> iters=server.ids.iterator(); boolean sucessed=false; while(iters.hasNext()){ int id=iters.next(); if(id==playerId){ System.out.println("此用户已连接!!!"); sucessed=true; break; } } if(!sucessed){ server.ids.add(playerId); System.out.println("来自 "+playerId+" 连接"); try { channel.register(server.selector,SelectionKey.OP_WRITE); } catch (ClosedChannelException e) { e.printStackTrace(); } } return 2000; } public void write(){ } } AS部分 package { import flash.display.Sprite; import flash.text.TextField; import flash.text.TextFieldType; import flash.display.Sprite; import flash.events.Event; import flash.events.MouseEvent; import flash.events.IOErrorEvent; import flash.events.SecurityErrorEvent; import flash.events.ProgressEvent; /** * ... * @author zkpursuit */ [SWF(width=400,height=500)] public class Main extends Sprite { private var registPanel:RegistPanel; private var client:NetClient; private var tf:TextField=new TextField(); public function Main() { stage.scaleMode = "noScale"; client = new NetClient(); client.buildConnection(); client.addEventListener(NetEvent.READED_DATA, onReadedDataHandler); client.addEventListener(Event.CONNECT, onConnectServerHandler); registPanel = new RegistPanel(); addChild(registPanel); registPanel.addEventListener(NetEvent.NET_CONNECT, requestConnectServer); tf.border = true; tf.width = 300; tf.height = 400; tf.x=50; tf.y = 50; tf.type = TextFieldType.INPUT; addChild(tf); } private function requestConnectServer(e:NetEvent):void { var p:Packet = new Packet(); p.writeShort(1000); p.writeInt(e.playerID); trace(e.playerID); client.sendPacket(p); } private function onConnectServerHandler(e:Event):void { tf.appendText("已建立连接\n"); } private function onReadedDataHandler(e:NetEvent):void { var p:Packet=new Packet(e.bytesData); var info:String = "---" + p.readInt() + "---" + p.readInt()+"---"+p.readString()+"\n"; tf.appendText(info); } } } package { import flash.display.Sprite; import flash.events.MouseEvent; import flash.text.TextField; import flash.text.TextFieldType; /** * ... * @author zkpursuit */ public class RegistPanel extends Sprite { private var tf:TextField; private var bt:Button; public function RegistPanel() { //this.graphics.beginFill(0xcc9933); //this.graphics.drawRect(0, 0, 200, 200); //this.graphics.endFill(); tf = new TextField(); tf.border = true; tf.type = TextFieldType.INPUT; tf.width = 100; tf.height = 20; addChild(tf); bt = new Button(40, 20); bt.x = tf.width + 10; addChild(bt); bt.addEventListener(MouseEvent.CLICK, onClickHandler); } private function onClickHandler(e:MouseEvent):void { //if (!int(tf.text)) { var event:NetEvent = new NetEvent(NetEvent.NET_CONNECT); event.playerID = (int)(tf.text); dispatchEvent(event); //} } } }/** * var p:Packet=new Packet(); * p.writeInt(40); * p.writeShort(20); * p.writeString("天下大统"); * * p.flip(); * * trace(p.readInt()); * trace(p.readShort()); * trace(p.readString()); */ package{ import flash.utils.ByteArray; /** * 数据包 * @author zkpursuit */ public class Packet { private var bytes:ByteArray; public function Packet(bytes:ByteArray=null){ if(bytes!=null){ this.bytes=bytes; }else{ this.bytes=new ByteArray(); } } public static function warp(bytes:ByteArray):Packet{ return new Packet(bytes); } public function writeShort(value:int):void{ bytes.writeShort(value); } public function writeUShort(value:uint):void{ bytes.writeShort(value); } public function writeInt(value:int):void{ bytes.writeInt(value); } public function writeUint(value:uint):void{ bytes.writeUnsignedInt(value); } public function writeFloat(value:Number):void{ bytes.writeFloat(value); } public function writeDouble(value:Number):void{ bytes.writeDouble(value); } public function writeByte(value:int):void{ bytes.writeByte(value); } public function writeBytes(bytes:ByteArray,offset:int=0,length:int=0):void{ bytes.writeBytes(bytes, offset, length); } public function writeString(value:String,charset:String="UTF-8"):void{ var ba:ByteArray=new ByteArray(); ba.writeMultiByte(value, charset); var len:int=ba.length; writeShort(len); bytes.writeBytes(ba); } public function readShort():int{ return bytes.readShort(); } public function readUShort():int{ return bytes.readUnsignedShort(); } public function readInt():int{ return bytes.readInt(); } public function readUint():uint{ return bytes.readUnsignedInt(); } public function readFloat():Number{ return bytes.readFloat(); } public function readDouble():Number{ return bytes.readDouble(); } public function readByte():int{ return bytes.readByte(); } public function readBytes(offset:int=0,length:int=0):ByteArray{ var _bytes:ByteArray=new ByteArray(); bytes.readBytes(_bytes, offset, length); return _bytes; } public function readString(charset:String="UTF-8"):String{ var str_len:int=readShort(); return bytes.readMultiByte(str_len, charset); } public function flip():void{ bytes.position=0; } public function array():ByteArray{ return bytes; } public function size():int{ return bytes.length; } public function clear():void{ bytes.clear(); } public function set position(value:int):void{ bytes.position=value; } public function get position():int{ return bytes.position; } /** * 得到数的二进制的某一位是否为1 * @param i 原数字 * @param bitNum 第几位 */ public static function getBit(i : uint,bitNum : uint) : Boolean { return (i & (1 << bitNum)) != 0; } /** * 把数字某位改写为1或0 *@param i 原数字 *@param bitNum 第几位 *@param have true 为写为1,false为0 */ public static function setBit(i : uint,bitNum : uint,have : Boolean = true) : uint { var t : uint = i; if(have) t |= (1 << bitNum); else t ^= t & (1 << bitNum); /*if(getBit(i, bitNum) == have)return; else*/ return t; } } }package{ import flash.events.Event; import flash.events.EventDispatcher; import flash.events.IOErrorEvent; import flash.events.ProgressEvent; import flash.events.SecurityErrorEvent; import flash.net.Socket; import flash.utils.ByteArray; import org.osmf.net.NetClient; /** * socket客户端,此类适用于 dataLen + data 的数据协议 * @author zkpursuit */ public class NetClient extends EventDispatcher{ private var socket:Socket; private var byteBuffer:ByteArray; //数据缓存 private var dataLength:int; //接收到的数据长度 private var headLen:int; //数据包首部长度 public function NetClient(){ byteBuffer = new ByteArray(); //数据长度,一般由服务器定义的数据协议中规定的数据包长度的数据类型的字节大小,int占有4个字节 headLen = 4; } /** * 建立socket连接 * @param host 服务器IP地址 * @param port 服务器开放供客户连接的端口 */ public function buildConnection(host:String = "127.0.0.1", port:int = 8088):void { socket = new Socket(host, port); socket.addEventListener(Event.CLOSE, closeHandler); socket.addEventListener(IOErrorEvent.IO_ERROR, ioErrorHandler); socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR, securityErrorHandler); socket.addEventListener(Event.CONNECT, connectHandler); socket.addEventListener(ProgressEvent.SOCKET_DATA, receivedHandler); // socket=new Socket(host,post) // socket.addEventListener(Event.CLOSE,closeHandler) // socket.addEventListener(IOErrorEvent.IO_ERROR,ioErrorHandler) // socket.addEventListener(SecurityErrorEvent.SECURITY_ERROR,securityErrorHamdler) // socket.addEventListener(Event.CONNECT,connectHandler) // socket.addEventListener(ProgressEvent.SOCKET_DATA,receivedHandler) } private function closeHandler(e:Event):void { socket.removeEventListener(Event.CLOSE, closeHandler); socket.removeEventListener(IOErrorEvent.IO_ERROR, ioErrorHandler); socket.removeEventListener(SecurityErrorEvent.SECURITY_ERROR, securityErrorHandler); socket.removeEventListener(Event.CONNECT, connectHandler); socket.removeEventListener(ProgressEvent.SOCKET_DATA, receivedHandler); dispatchEvent(e); trace("服务器已关闭此连接"); } private function ioErrorHandler(e:IOErrorEvent):void { dispatchEvent(e); } private function securityErrorHandler(e:SecurityErrorEvent):void { dispatchEvent(e); } private function connectHandler(e:Event):void { dispatchEvent(e); trace("已建立连接"); } private function receivedHandler(e:ProgressEvent):void { dispatchEvent(e); parse(); } /** * 为避免一次接收到多条数据包,必须进行数据包的分离解码 * 网络数据解码 */ /* private function parseCopy():void { var readFlag:Boolean=false while(socket.bytesAvailable) { if(!readFlag && socket.bytesAvailable>=headLen) { dataLength=socket.readInt() readFlag=true } if(dataLength==0) { dispatchEvent(new NetEvent(NetEvent.NULL_STREAM)) return } if(readFlag && socket.bytesAvailable>=dataLength) { byteBuffer.position=0 socket.readByte(byteBuffer,0,dataLength) var event:NetEvent(NetEvent.READED_DATA) event.bytesData=byteBuffer dispatchEvent(event) dataLength=0 readFlag=false } } } */ private function parse():void{ //开始读取数据的标记 var readFlag:Boolean = false; //每读取一条数据bytesAvailable值将随之减少 //while (socket.bytesAvailable>=headLen) { while (socket.bytesAvailable) { if (!readFlag && socket.bytesAvailable >= headLen) { //读取数据长度 dataLength = socket.readInt(); readFlag = true; } //如果为0,表示收到异常消息,避免无限循环地等待 if(dataLength==0){ dispatchEvent(new NetEvent(NetEvent.NULL_STREAM)); return; } //数据流里的数据满足条件,开始读数据 if (readFlag && socket.bytesAvailable >= dataLength){ //指针回归 byteBuffer.position = 0; //取出指定长度的字节 socket.readBytes(byteBuffer, 0, dataLength); //读取完一条消息后发送消息内容 var event:NetEvent=new NetEvent(NetEvent.READED_DATA); event.bytesData=byteBuffer; dispatchEvent(event); dataLength = 0; readFlag = false; } } } /** * 发送数据 * @param bytes */ public function send(bytes:ByteArray):void{ var bytes_len:int = bytes.length; //数据的长度 socket.writeInt(bytes_len); //发送的数据内容 socket.writeBytes(bytes); socket.flush(); } /** * 发送自定义数据包 * @param packet */ public function sendPacket(packet:Packet):void { send(packet.array()); } } } 代码分别打开flex builder和eclipse,运行java服务端,然后运行as3客户端 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-06-11
as 部分没有 writeLong 和 readLong 呀。
|
|
返回顶楼 | |