最近一个项目中用到了Java解码,主要采用的是Mina框架,现将遇到的问题总结一下,以备后查:
终端是用C编码,通过CAN中转,最后转成TCP送出,用Java写了个服务端,接收并解析入库
一、字节序的问题
关于字节序,请见 http://zh.wikipedia.org/wiki/%E5%AD%97%E8%8A%82%E5%BA%8F
C和Java不一样,所以代码中要这样设置一下
[java] view plaincopyprint?
01. @Override
02. protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
03. in.order(ByteOrder.LITTLE_ENDIAN);
04. //...
05.}
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
in.order(ByteOrder.LITTLE_ENDIAN);
//...
}
二、数据类型的问题
C,C++中有unsigned类型,Java中没有,所以对于一个字节,C中可以存储unsigned类型,即0到255,而java对应的是byte,即-128到127
所以要做转换
[java] view plaincopyprint?
01.byte cur_b = in.get(); //这个是byte
02.int cur_i = cur_b & 0xff; //做运算时要转成整形
byte cur_b = in.get(); //这个是byte
int cur_i = cur_b & 0xff; //做运算时要转成整形
三、丰富的字节转换函数
除了上面的强转之外,Mina的IoBuffer中提供了很多方便的转换api,能将一个或多个字节的二进制数据方便地转成这种类型
[java] view plaincopyprint?
01.get()
02.get(byte[])
03.get(byte[], int, int)
04.get(int)
05.getChar()
06.getChar(int)
07.getDouble()
08.getDouble(int)
09.getEnum(int, Class<E>)
10.getEnum(Class<E>)
11.getEnumInt(int, Class<E>)
12.getEnumInt(Class<E>)
13.getEnumSet(int, Class<E>)
14.getEnumSet(Class<E>)
15.getEnumSetInt(int, Class<E>)
16.getEnumSetInt(Class<E>)
17.getEnumSetLong(int, Class<E>)
18.getEnumSetLong(Class<E>)
19.getEnumSetShort(int, Class<E>)
20.getEnumSetShort(Class<E>)
21.getEnumShort(int, Class<E>)
22.getEnumShort(Class<E>)
23.getFloat()
24.getFloat(int)
25.getHexDump()
26.getHexDump(int)
27.getInt()
28.getInt(int)
29.getLong()
30.getLong(int)
31.getMediumInt()
32.getMediumInt(int)
33.getObject()
34.getObject(ClassLoader)
35.getPrefixedString(int, CharsetDecoder)
36.getPrefixedString(CharsetDecoder)
37.getShort()
38.getShort(int)
39.getSlice(int)
40.getSlice(int, int)
41.getString(int, CharsetDecoder)
42.getString(CharsetDecoder)
43.getUnsigned()
44.getUnsigned(int)
45.getUnsignedInt()
46.getUnsignedInt(int)
47.getUnsignedMediumInt()
48.getUnsignedMediumInt(int)
49.getUnsignedShort()
50.getUnsignedShort(int)
get()
get(byte[])
get(byte[], int, int)
get(int)
getChar()
getChar(int)
getDouble()
getDouble(int)
getEnum(int, Class<E>)
getEnum(Class<E>)
getEnumInt(int, Class<E>)
getEnumInt(Class<E>)
getEnumSet(int, Class<E>)
getEnumSet(Class<E>)
getEnumSetInt(int, Class<E>)
getEnumSetInt(Class<E>)
getEnumSetLong(int, Class<E>)
getEnumSetLong(Class<E>)
getEnumSetShort(int, Class<E>)
getEnumSetShort(Class<E>)
getEnumShort(int, Class<E>)
getEnumShort(Class<E>)
getFloat()
getFloat(int)
getHexDump()
getHexDump(int)
getInt()
getInt(int)
getLong()
getLong(int)
getMediumInt()
getMediumInt(int)
getObject()
getObject(ClassLoader)
getPrefixedString(int, CharsetDecoder)
getPrefixedString(CharsetDecoder)
getShort()
getShort(int)
getSlice(int)
getSlice(int, int)
getString(int, CharsetDecoder)
getString(CharsetDecoder)
getUnsigned()
getUnsigned(int)
getUnsignedInt()
getUnsignedInt(int)
getUnsignedMediumInt()
getUnsignedMediumInt(int)
getUnsignedShort()
getUnsignedShort(int)
基本上足够使用了
四、处理断包等问题,只要解码器Decoder extends CumulativeProtocolDecoder即可
/*
A ProtocolDecoder that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.
If the received IoBuffer is only a part of a message. decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
Here is an example decoder that decodes CRLF terminated lines into Command objects:
*/
[java] view plaincopyprint?
01.public class CrLfTerminatedCommandLineDecoder
02. extends CumulativeProtocolDecoder {
03.
04.
05. private Command parseCommand(IoBuffer in) {
06. // Convert the bytes in the specified buffer to a
07. // Command object.
08. ...
09. }
10.
11.
12. protected boolean doDecode(
13. IoSession session, IoBuffer in, ProtocolDecoderOutput out)
14. throws Exception {
15.
16.
17. // Remember the initial position.
18. int start = in.position();
19.
20.
21. // Now find the first CRLF in the buffer.
22. byte previous = 0;
23. while (in.hasRemaining()) {
24. byte current = in.get();
25.
26.
27. if (previous == '\r' && current == '\n') {
28. // Remember the current position and limit.
29. int position = in.position();
30. int limit = in.limit();
31. try {
32. in.position(start);
33. in.limit(position);
34. // The bytes between in.position() and in.limit()
35. // now contain a full CRLF terminated line.
36. out.write(parseCommand(in.slice()));
37. } finally {
38. // Set the position to point right after the
39. // detected line and set the limit to the old
40. // one.
41. in.position(position);
42. in.limit(limit);
43. }
44. // Decoded one line; CumulativeProtocolDecoder will
45. // call me again until I return false. So just
46. // return true until there are no more lines in the
47. // buffer.
48. return true;
49. }
50.
51.
52. previous = current;
53. }
54.
55.
56. // Could not find CRLF in the buffer. Reset the initial
57. // position to the one we recorded above.
58. in.position(start);
59.
60.
61. return false;
62. }
63. }
public class CrLfTerminatedCommandLineDecoder
extends CumulativeProtocolDecoder {
private Command parseCommand(IoBuffer in) {
// Convert the bytes in the specified buffer to a
// Command object.
...
}
protected boolean doDecode(
IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Remember the initial position.
int start = in.position();
// Now find the first CRLF in the buffer.
byte previous = 0;
while (in.hasRemaining()) {
byte current = in.get();
if (previous == '\r' && current == '\n') {
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();
try {
in.position(start);
in.limit(position);
// The bytes between in.position() and in.limit()
// now contain a full CRLF terminated line.
out.write(parseCommand(in.slice()));
} finally {
// Set the position to point right after the
// detected line and set the limit to the old
// one.
in.position(position);
in.limit(limit);
}
// Decoded one line; CumulativeProtocolDecoder will
// call me again until I return false. So just
// return true until there are no more lines in the
// buffer.
return true;
}
previous = current;
}
// Could not find CRLF in the buffer. Reset the initial
// position to the one we recorded above.
in.position(start);
return false;
}
}
/*
Please note that this decoder simply forward the call to doDecode(IoSession, IoBuffer, ProtocolDecoderOutput) if the underlying transport doesn't have a packet fragmentation. Whether the transport has fragmentation or not is determined by querying TransportMetadata.
*/
除此之外,还要将未解析完的数据和标志位等放到IoSession中
[java] view plaincopyprint?
01.Context ctx = getContext(session);
02.
03.
04.private Context getContext(IoSession session) {
05. Context context = (Context) session.getAttribute(CONTEXT);
06. if (context == null) {
07. context = new Context();
08. session.setAttribute(CONTEXT, context);
09. }
10. return context;
11.}
Context ctx = getContext(session);
private Context getContext(IoSession session) {
Context context = (Context) session.getAttribute(CONTEXT);
if (context == null) {
context = new Context();
session.setAttribute(CONTEXT, context);
}
return context;
}
五、多个服务(端口)共享Session问题
应用场景比如,一个端口9090专门负责数据接收,另外一个端口8080接收来自web端的指令并传送给终端并返回数据。
原理是在一个主函数中开两个mina服务(端口),但此时两个服务中的IoSession是不能互访的,所以要在主进程中管理他们
[java] view plaincopyprint?
01.public class Main {
02. private static final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());//这个至关重要
03. private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
04. private static final int PORT_DTU = 9090;
05. private static final int PORT_WEB = 8080;
06. private static final ProtocolCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("UTF-8"));
07.
08. public static void main(String[] args) {
09. try {
10. new MinaxServer(PORT_DTU, textLineCodecFactory, new MinaxDtuIoHandler()).start();
11. LOGGER.info("Server started at port {}.", PORT_DTU);
12.
13. new MinaxServer(PORT_WEB, textLineCodecFactory, new MinaxWebIoHandler()).start();
14. LOGGER.info("Server started at port {}.", PORT_WEB);
15.
16. } catch (IOException ioe) {
17. LOGGER.error("Can't start server!", ioe);
18. }
19.
20. while (true) {
21. System.out.printf("session count:%d\n", Main.getSessions().size());
22. try {
23. Thread.sleep(10000);
24. } catch (InterruptedException e) {
25. e.printStackTrace();
26. }
27. }
28. }
29.
30. public static Set<IoSession> getSessions() {
31. return sessions;
32. }
33.}
public class Main {
private static final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());//这个至关重要
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static final int PORT_DTU = 9090;
private static final int PORT_WEB = 8080;
private static final ProtocolCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("UTF-8"));
public static void main(String[] args) {
try {
new MinaxServer(PORT_DTU, textLineCodecFactory, new MinaxDtuIoHandler()).start();
LOGGER.info("Server started at port {}.", PORT_DTU);
new MinaxServer(PORT_WEB, textLineCodecFactory, new MinaxWebIoHandler()).start();
LOGGER.info("Server started at port {}.", PORT_WEB);
} catch (IOException ioe) {
LOGGER.error("Can't start server!", ioe);
}
while (true) {
System.out.printf("session count:%d\n", Main.getSessions().size());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static Set<IoSession> getSessions() {
return sessions;
}
}
之后在传送端的IoHandler中加以管理,即在SessionCreated时,加入到Set中,sessionClosed的时候从Set中Remove掉
[java] view plaincopyprint?
01.@Override
02.public void sessionCreated(IoSession session) throws Exception {
03. // super.sessionCreated(session);
04. Main.getSessions().add(session);
05. // String message = String.format("IP:%s, Welcome to dtu server\n", session.getRemoteAddress());
06. // session.write(message);
07. logger.debug("{}:session[{}]Created...", session.getRemoteAddress(), session.getId());
08.}
09.
10.@Override
11.public void sessionOpened(IoSession session) throws Exception {
12. // super.sessionOpened(session);
13. logger.debug("sessionOpened...");
14.}
15.
16.@Override
17.public void sessionClosed(IoSession session) throws Exception {
18. Main.getSessions().remove(session);
19. // super.sessionClosed(session);
20. logger.debug("sessionClosed...");
21.}
@Override
public void sessionCreated(IoSession session) throws Exception {
// super.sessionCreated(session);
Main.getSessions().add(session);
// String message = String.format("IP:%s, Welcome to dtu server\n", session.getRemoteAddress());
// session.write(message);
logger.debug("{}:session[{}]Created...", session.getRemoteAddress(), session.getId());
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// super.sessionOpened(session);
logger.debug("sessionOpened...");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
Main.getSessions().remove(session);
// super.sessionClosed(session);
logger.debug("sessionClosed...");
}
在Web端访问IoSession
[java] view plaincopyprint?
01.@Override
02. public void messageReceived(IoSession session, Object message) throws Exception {
03. logger.debug("messageReceived...");
04. logger.debug("...message:{}", message);
05. String jsonMessage = message.toString();
06.
07. JSONObject o = JSON.parseObject(jsonMessage);
08. Integer dtu_id = o.getInteger("dtu_id");
09. Long session_id = o.getLong("session_id");
10. String action = o.getString("action");
11. String params = o.getString("params");
12.
13. action = null == action ? "" : action.toLowerCase();
14. JSONObject p = JSON.parseObject(params);
15.
16. Set<IoSession> sessions = Main.getSessions();//从主线程中取得session
17. switch (action) {
18. case "quit":
19. session.close(true);
20. break;
21. case "get_session_count":
22. session.write(sessions.size());
23. break;
24. case "broadcast":
25. String msg_bc = null == p ? null : p.getString("message");
26. if (null == msg_bc || msg_bc.length() == 0) {
27. msg_bc = "hello dtu!";
28. }
29. synchronized (sessions) {//注意同步
30. for (IoSession sess : sessions) {
31. // if (session.hashCode() == sess.hashCode()) {
32. // continue;
33. // }
34. if (sess.isConnected()) {
35. sess.write(msg_bc);
36. }
37. }
38. }
39. break;
40. default:
41. session.write("UNKOWN COMMAND");
42. break;
43. }
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
logger.debug("messageReceived...");
logger.debug("...message:{}", message);
String jsonMessage = message.toString();
JSONObject o = JSON.parseObject(jsonMessage);
Integer dtu_id = o.getInteger("dtu_id");
Long session_id = o.getLong("session_id");
String action = o.getString("action");
String params = o.getString("params");
action = null == action ? "" : action.toLowerCase();
JSONObject p = JSON.parseObject(params);
Set<IoSession> sessions = Main.getSessions();//从主线程中取得session
switch (action) {
case "quit":
session.close(true);
break;
case "get_session_count":
session.write(sessions.size());
break;
case "broadcast":
String msg_bc = null == p ? null : p.getString("message");
if (null == msg_bc || msg_bc.length() == 0) {
msg_bc = "hello dtu!";
}
synchronized (sessions) {//注意同步
for (IoSession sess : sessions) {
// if (session.hashCode() == sess.hashCode()) {
// continue;
// }
if (sess.isConnected()) {
sess.write(msg_bc);
}
}
}
break;
default:
session.write("UNKOWN COMMAND");
break;
}
六、Web端的处理
对于Server接收端,一般采用长连接,http是无状态的,例如我发送了一个指令以取得在线的终端数据,发送到Server后,通过IoHandler处理,返回数据后在页面显示,这个连接就没有必要再保持了,可以直接关掉。相关代码
[java] view plaincopyprint?
01.@RequestMapping(value = "/send", produces = "text/html;charset=UTF-8", method = RequestMethod.POST)
02.@ResponseBody
03.public String send(@RequestParam String cmd) {
04. logger.debug("...cmd:{}", cmd);
05.
06. // 创建客户端连接器
07. IoConnector connector = new NioSocketConnector();
08. // 设置事件处理器
09. connector.setHandler(new ClientIoHandler());
10. // 设置编码过滤器和按行读取数据模式
11. connector.getFilterChain().addLast("codec",
12. new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
13. // 创建连接
14. ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
15. // 等待连接创建完成
16. future.awaitUninterruptibly();
17. // 获取连接会话
18. IoSession session = future.getSession();
19. // 产生当次发送请求标识,标识由客户端地址+随机数
20. // 这里的标识只是一个例子,标识可以任何方法产生,只要保持在系统中的唯一性
21. String flag = UUID.randomUUID().toString();
22. logger.debug("...flag:{}", flag);
23. // 将标识保存到当前session中
24. session.setAttribute(ClientIoHandler.SESSION_KEY, flag);
25. // 向服务器发送命令信息
26. session.write(cmd);
27. // 等待连接断开
28. session.getCloseFuture().awaitUninterruptibly();
29. connector.dispose();
30.
31. // 通过标识获取保存的结果
32. Object result = ClientIoHandler.RESULTS.get(flag);
33. // 清除标识内容
34. ClientIoHandler.RESULTS.remove(flag);
35. // 将结果返回客户端
36. return result.toString();
37.}
@RequestMapping(value = "/send", produces = "text/html;charset=UTF-8", method = RequestMethod.POST)
@ResponseBody
public String send(@RequestParam String cmd) {
logger.debug("...cmd:{}", cmd);
// 创建客户端连接器
IoConnector connector = new NioSocketConnector();
// 设置事件处理器
connector.setHandler(new ClientIoHandler());
// 设置编码过滤器和按行读取数据模式
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 创建连接
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
// 等待连接创建完成
future.awaitUninterruptibly();
// 获取连接会话
IoSession session = future.getSession();
// 产生当次发送请求标识,标识由客户端地址+随机数
// 这里的标识只是一个例子,标识可以任何方法产生,只要保持在系统中的唯一性
String flag = UUID.randomUUID().toString();
logger.debug("...flag:{}", flag);
// 将标识保存到当前session中
session.setAttribute(ClientIoHandler.SESSION_KEY, flag);
// 向服务器发送命令信息
session.write(cmd);
// 等待连接断开
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
// 通过标识获取保存的结果
Object result = ClientIoHandler.RESULTS.get(flag);
// 清除标识内容
ClientIoHandler.RESULTS.remove(flag);
// 将结果返回客户端
return result.toString();
}
ClientIoHandler:
[java] view plaincopyprint?
01.public class ClientIoHandler extends IoHandlerAdapter {
02. private final Logger logger = LoggerFactory.getLogger(ClientIoHandler.class);
03. public static final String SESSION_KEY = "com.company.project.client.clientiohandle.SESSION_KEY";
04. public static final Map<String, Object> RESULTS = new ConcurrentHashMap<String, Object>();
05.
06. public void messageReceived(IoSession session, Object message) throws Exception {
07. logger.debug("messageReceived...");
08. // 从session中取到标识
09. String flag = (String) session.getAttribute(SESSION_KEY);
10. logger.debug("...flag:{}", flag);
11. // 将从服务端接收到的message和标识绑定保存,可以保存到内存、文件、数据库等等
12. // 在这里简单的以标识为key,message为value保存到Map中
13. RESULTS.put(flag, message);
14. // 关闭session
15. session.close(true);
16. }
17.}
public class ClientIoHandler extends IoHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(ClientIoHandler.class);
public static final String SESSION_KEY = "com.company.project.client.clientiohandle.SESSION_KEY";
public static final Map<String, Object> RESULTS = new ConcurrentHashMap<String, Object>();
public void messageReceived(IoSession session, Object message) throws Exception {
logger.debug("messageReceived...");
// 从session中取到标识
String flag = (String) session.getAttribute(SESSION_KEY);
logger.debug("...flag:{}", flag);
// 将从服务端接收到的message和标识绑定保存,可以保存到内存、文件、数据库等等
// 在这里简单的以标识为key,message为value保存到Map中
RESULTS.put(flag, message);
// 关闭session
session.close(true);
}
}
后继再补充...
终端是用C编码,通过CAN中转,最后转成TCP送出,用Java写了个服务端,接收并解析入库
一、字节序的问题
关于字节序,请见 http://zh.wikipedia.org/wiki/%E5%AD%97%E8%8A%82%E5%BA%8F
C和Java不一样,所以代码中要这样设置一下
[java] view plaincopyprint?
01. @Override
02. protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
03. in.order(ByteOrder.LITTLE_ENDIAN);
04. //...
05.}
@Override
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
in.order(ByteOrder.LITTLE_ENDIAN);
//...
}
二、数据类型的问题
C,C++中有unsigned类型,Java中没有,所以对于一个字节,C中可以存储unsigned类型,即0到255,而java对应的是byte,即-128到127
所以要做转换
[java] view plaincopyprint?
01.byte cur_b = in.get(); //这个是byte
02.int cur_i = cur_b & 0xff; //做运算时要转成整形
byte cur_b = in.get(); //这个是byte
int cur_i = cur_b & 0xff; //做运算时要转成整形
三、丰富的字节转换函数
除了上面的强转之外,Mina的IoBuffer中提供了很多方便的转换api,能将一个或多个字节的二进制数据方便地转成这种类型
[java] view plaincopyprint?
01.get()
02.get(byte[])
03.get(byte[], int, int)
04.get(int)
05.getChar()
06.getChar(int)
07.getDouble()
08.getDouble(int)
09.getEnum(int, Class<E>)
10.getEnum(Class<E>)
11.getEnumInt(int, Class<E>)
12.getEnumInt(Class<E>)
13.getEnumSet(int, Class<E>)
14.getEnumSet(Class<E>)
15.getEnumSetInt(int, Class<E>)
16.getEnumSetInt(Class<E>)
17.getEnumSetLong(int, Class<E>)
18.getEnumSetLong(Class<E>)
19.getEnumSetShort(int, Class<E>)
20.getEnumSetShort(Class<E>)
21.getEnumShort(int, Class<E>)
22.getEnumShort(Class<E>)
23.getFloat()
24.getFloat(int)
25.getHexDump()
26.getHexDump(int)
27.getInt()
28.getInt(int)
29.getLong()
30.getLong(int)
31.getMediumInt()
32.getMediumInt(int)
33.getObject()
34.getObject(ClassLoader)
35.getPrefixedString(int, CharsetDecoder)
36.getPrefixedString(CharsetDecoder)
37.getShort()
38.getShort(int)
39.getSlice(int)
40.getSlice(int, int)
41.getString(int, CharsetDecoder)
42.getString(CharsetDecoder)
43.getUnsigned()
44.getUnsigned(int)
45.getUnsignedInt()
46.getUnsignedInt(int)
47.getUnsignedMediumInt()
48.getUnsignedMediumInt(int)
49.getUnsignedShort()
50.getUnsignedShort(int)
get()
get(byte[])
get(byte[], int, int)
get(int)
getChar()
getChar(int)
getDouble()
getDouble(int)
getEnum(int, Class<E>)
getEnum(Class<E>)
getEnumInt(int, Class<E>)
getEnumInt(Class<E>)
getEnumSet(int, Class<E>)
getEnumSet(Class<E>)
getEnumSetInt(int, Class<E>)
getEnumSetInt(Class<E>)
getEnumSetLong(int, Class<E>)
getEnumSetLong(Class<E>)
getEnumSetShort(int, Class<E>)
getEnumSetShort(Class<E>)
getEnumShort(int, Class<E>)
getEnumShort(Class<E>)
getFloat()
getFloat(int)
getHexDump()
getHexDump(int)
getInt()
getInt(int)
getLong()
getLong(int)
getMediumInt()
getMediumInt(int)
getObject()
getObject(ClassLoader)
getPrefixedString(int, CharsetDecoder)
getPrefixedString(CharsetDecoder)
getShort()
getShort(int)
getSlice(int)
getSlice(int, int)
getString(int, CharsetDecoder)
getString(CharsetDecoder)
getUnsigned()
getUnsigned(int)
getUnsignedInt()
getUnsignedInt(int)
getUnsignedMediumInt()
getUnsignedMediumInt(int)
getUnsignedShort()
getUnsignedShort(int)
基本上足够使用了
四、处理断包等问题,只要解码器Decoder extends CumulativeProtocolDecoder即可
/*
A ProtocolDecoder that cumulates the content of received buffers to a cumulative buffer to help users implement decoders.
If the received IoBuffer is only a part of a message. decoders should cumulate received buffers to make a message complete or to postpone decoding until more buffers arrive.
Here is an example decoder that decodes CRLF terminated lines into Command objects:
*/
[java] view plaincopyprint?
01.public class CrLfTerminatedCommandLineDecoder
02. extends CumulativeProtocolDecoder {
03.
04.
05. private Command parseCommand(IoBuffer in) {
06. // Convert the bytes in the specified buffer to a
07. // Command object.
08. ...
09. }
10.
11.
12. protected boolean doDecode(
13. IoSession session, IoBuffer in, ProtocolDecoderOutput out)
14. throws Exception {
15.
16.
17. // Remember the initial position.
18. int start = in.position();
19.
20.
21. // Now find the first CRLF in the buffer.
22. byte previous = 0;
23. while (in.hasRemaining()) {
24. byte current = in.get();
25.
26.
27. if (previous == '\r' && current == '\n') {
28. // Remember the current position and limit.
29. int position = in.position();
30. int limit = in.limit();
31. try {
32. in.position(start);
33. in.limit(position);
34. // The bytes between in.position() and in.limit()
35. // now contain a full CRLF terminated line.
36. out.write(parseCommand(in.slice()));
37. } finally {
38. // Set the position to point right after the
39. // detected line and set the limit to the old
40. // one.
41. in.position(position);
42. in.limit(limit);
43. }
44. // Decoded one line; CumulativeProtocolDecoder will
45. // call me again until I return false. So just
46. // return true until there are no more lines in the
47. // buffer.
48. return true;
49. }
50.
51.
52. previous = current;
53. }
54.
55.
56. // Could not find CRLF in the buffer. Reset the initial
57. // position to the one we recorded above.
58. in.position(start);
59.
60.
61. return false;
62. }
63. }
public class CrLfTerminatedCommandLineDecoder
extends CumulativeProtocolDecoder {
private Command parseCommand(IoBuffer in) {
// Convert the bytes in the specified buffer to a
// Command object.
...
}
protected boolean doDecode(
IoSession session, IoBuffer in, ProtocolDecoderOutput out)
throws Exception {
// Remember the initial position.
int start = in.position();
// Now find the first CRLF in the buffer.
byte previous = 0;
while (in.hasRemaining()) {
byte current = in.get();
if (previous == '\r' && current == '\n') {
// Remember the current position and limit.
int position = in.position();
int limit = in.limit();
try {
in.position(start);
in.limit(position);
// The bytes between in.position() and in.limit()
// now contain a full CRLF terminated line.
out.write(parseCommand(in.slice()));
} finally {
// Set the position to point right after the
// detected line and set the limit to the old
// one.
in.position(position);
in.limit(limit);
}
// Decoded one line; CumulativeProtocolDecoder will
// call me again until I return false. So just
// return true until there are no more lines in the
// buffer.
return true;
}
previous = current;
}
// Could not find CRLF in the buffer. Reset the initial
// position to the one we recorded above.
in.position(start);
return false;
}
}
/*
Please note that this decoder simply forward the call to doDecode(IoSession, IoBuffer, ProtocolDecoderOutput) if the underlying transport doesn't have a packet fragmentation. Whether the transport has fragmentation or not is determined by querying TransportMetadata.
*/
除此之外,还要将未解析完的数据和标志位等放到IoSession中
[java] view plaincopyprint?
01.Context ctx = getContext(session);
02.
03.
04.private Context getContext(IoSession session) {
05. Context context = (Context) session.getAttribute(CONTEXT);
06. if (context == null) {
07. context = new Context();
08. session.setAttribute(CONTEXT, context);
09. }
10. return context;
11.}
Context ctx = getContext(session);
private Context getContext(IoSession session) {
Context context = (Context) session.getAttribute(CONTEXT);
if (context == null) {
context = new Context();
session.setAttribute(CONTEXT, context);
}
return context;
}
五、多个服务(端口)共享Session问题
应用场景比如,一个端口9090专门负责数据接收,另外一个端口8080接收来自web端的指令并传送给终端并返回数据。
原理是在一个主函数中开两个mina服务(端口),但此时两个服务中的IoSession是不能互访的,所以要在主进程中管理他们
[java] view plaincopyprint?
01.public class Main {
02. private static final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());//这个至关重要
03. private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
04. private static final int PORT_DTU = 9090;
05. private static final int PORT_WEB = 8080;
06. private static final ProtocolCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("UTF-8"));
07.
08. public static void main(String[] args) {
09. try {
10. new MinaxServer(PORT_DTU, textLineCodecFactory, new MinaxDtuIoHandler()).start();
11. LOGGER.info("Server started at port {}.", PORT_DTU);
12.
13. new MinaxServer(PORT_WEB, textLineCodecFactory, new MinaxWebIoHandler()).start();
14. LOGGER.info("Server started at port {}.", PORT_WEB);
15.
16. } catch (IOException ioe) {
17. LOGGER.error("Can't start server!", ioe);
18. }
19.
20. while (true) {
21. System.out.printf("session count:%d\n", Main.getSessions().size());
22. try {
23. Thread.sleep(10000);
24. } catch (InterruptedException e) {
25. e.printStackTrace();
26. }
27. }
28. }
29.
30. public static Set<IoSession> getSessions() {
31. return sessions;
32. }
33.}
public class Main {
private static final Set<IoSession> sessions = Collections.synchronizedSet(new HashSet<IoSession>());//这个至关重要
private static final Logger LOGGER = LoggerFactory.getLogger(Main.class);
private static final int PORT_DTU = 9090;
private static final int PORT_WEB = 8080;
private static final ProtocolCodecFactory textLineCodecFactory = new TextLineCodecFactory(Charset.forName("UTF-8"));
public static void main(String[] args) {
try {
new MinaxServer(PORT_DTU, textLineCodecFactory, new MinaxDtuIoHandler()).start();
LOGGER.info("Server started at port {}.", PORT_DTU);
new MinaxServer(PORT_WEB, textLineCodecFactory, new MinaxWebIoHandler()).start();
LOGGER.info("Server started at port {}.", PORT_WEB);
} catch (IOException ioe) {
LOGGER.error("Can't start server!", ioe);
}
while (true) {
System.out.printf("session count:%d\n", Main.getSessions().size());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static Set<IoSession> getSessions() {
return sessions;
}
}
之后在传送端的IoHandler中加以管理,即在SessionCreated时,加入到Set中,sessionClosed的时候从Set中Remove掉
[java] view plaincopyprint?
01.@Override
02.public void sessionCreated(IoSession session) throws Exception {
03. // super.sessionCreated(session);
04. Main.getSessions().add(session);
05. // String message = String.format("IP:%s, Welcome to dtu server\n", session.getRemoteAddress());
06. // session.write(message);
07. logger.debug("{}:session[{}]Created...", session.getRemoteAddress(), session.getId());
08.}
09.
10.@Override
11.public void sessionOpened(IoSession session) throws Exception {
12. // super.sessionOpened(session);
13. logger.debug("sessionOpened...");
14.}
15.
16.@Override
17.public void sessionClosed(IoSession session) throws Exception {
18. Main.getSessions().remove(session);
19. // super.sessionClosed(session);
20. logger.debug("sessionClosed...");
21.}
@Override
public void sessionCreated(IoSession session) throws Exception {
// super.sessionCreated(session);
Main.getSessions().add(session);
// String message = String.format("IP:%s, Welcome to dtu server\n", session.getRemoteAddress());
// session.write(message);
logger.debug("{}:session[{}]Created...", session.getRemoteAddress(), session.getId());
}
@Override
public void sessionOpened(IoSession session) throws Exception {
// super.sessionOpened(session);
logger.debug("sessionOpened...");
}
@Override
public void sessionClosed(IoSession session) throws Exception {
Main.getSessions().remove(session);
// super.sessionClosed(session);
logger.debug("sessionClosed...");
}
在Web端访问IoSession
[java] view plaincopyprint?
01.@Override
02. public void messageReceived(IoSession session, Object message) throws Exception {
03. logger.debug("messageReceived...");
04. logger.debug("...message:{}", message);
05. String jsonMessage = message.toString();
06.
07. JSONObject o = JSON.parseObject(jsonMessage);
08. Integer dtu_id = o.getInteger("dtu_id");
09. Long session_id = o.getLong("session_id");
10. String action = o.getString("action");
11. String params = o.getString("params");
12.
13. action = null == action ? "" : action.toLowerCase();
14. JSONObject p = JSON.parseObject(params);
15.
16. Set<IoSession> sessions = Main.getSessions();//从主线程中取得session
17. switch (action) {
18. case "quit":
19. session.close(true);
20. break;
21. case "get_session_count":
22. session.write(sessions.size());
23. break;
24. case "broadcast":
25. String msg_bc = null == p ? null : p.getString("message");
26. if (null == msg_bc || msg_bc.length() == 0) {
27. msg_bc = "hello dtu!";
28. }
29. synchronized (sessions) {//注意同步
30. for (IoSession sess : sessions) {
31. // if (session.hashCode() == sess.hashCode()) {
32. // continue;
33. // }
34. if (sess.isConnected()) {
35. sess.write(msg_bc);
36. }
37. }
38. }
39. break;
40. default:
41. session.write("UNKOWN COMMAND");
42. break;
43. }
@Override
public void messageReceived(IoSession session, Object message) throws Exception {
logger.debug("messageReceived...");
logger.debug("...message:{}", message);
String jsonMessage = message.toString();
JSONObject o = JSON.parseObject(jsonMessage);
Integer dtu_id = o.getInteger("dtu_id");
Long session_id = o.getLong("session_id");
String action = o.getString("action");
String params = o.getString("params");
action = null == action ? "" : action.toLowerCase();
JSONObject p = JSON.parseObject(params);
Set<IoSession> sessions = Main.getSessions();//从主线程中取得session
switch (action) {
case "quit":
session.close(true);
break;
case "get_session_count":
session.write(sessions.size());
break;
case "broadcast":
String msg_bc = null == p ? null : p.getString("message");
if (null == msg_bc || msg_bc.length() == 0) {
msg_bc = "hello dtu!";
}
synchronized (sessions) {//注意同步
for (IoSession sess : sessions) {
// if (session.hashCode() == sess.hashCode()) {
// continue;
// }
if (sess.isConnected()) {
sess.write(msg_bc);
}
}
}
break;
default:
session.write("UNKOWN COMMAND");
break;
}
六、Web端的处理
对于Server接收端,一般采用长连接,http是无状态的,例如我发送了一个指令以取得在线的终端数据,发送到Server后,通过IoHandler处理,返回数据后在页面显示,这个连接就没有必要再保持了,可以直接关掉。相关代码
[java] view plaincopyprint?
01.@RequestMapping(value = "/send", produces = "text/html;charset=UTF-8", method = RequestMethod.POST)
02.@ResponseBody
03.public String send(@RequestParam String cmd) {
04. logger.debug("...cmd:{}", cmd);
05.
06. // 创建客户端连接器
07. IoConnector connector = new NioSocketConnector();
08. // 设置事件处理器
09. connector.setHandler(new ClientIoHandler());
10. // 设置编码过滤器和按行读取数据模式
11. connector.getFilterChain().addLast("codec",
12. new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
13. // 创建连接
14. ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
15. // 等待连接创建完成
16. future.awaitUninterruptibly();
17. // 获取连接会话
18. IoSession session = future.getSession();
19. // 产生当次发送请求标识,标识由客户端地址+随机数
20. // 这里的标识只是一个例子,标识可以任何方法产生,只要保持在系统中的唯一性
21. String flag = UUID.randomUUID().toString();
22. logger.debug("...flag:{}", flag);
23. // 将标识保存到当前session中
24. session.setAttribute(ClientIoHandler.SESSION_KEY, flag);
25. // 向服务器发送命令信息
26. session.write(cmd);
27. // 等待连接断开
28. session.getCloseFuture().awaitUninterruptibly();
29. connector.dispose();
30.
31. // 通过标识获取保存的结果
32. Object result = ClientIoHandler.RESULTS.get(flag);
33. // 清除标识内容
34. ClientIoHandler.RESULTS.remove(flag);
35. // 将结果返回客户端
36. return result.toString();
37.}
@RequestMapping(value = "/send", produces = "text/html;charset=UTF-8", method = RequestMethod.POST)
@ResponseBody
public String send(@RequestParam String cmd) {
logger.debug("...cmd:{}", cmd);
// 创建客户端连接器
IoConnector connector = new NioSocketConnector();
// 设置事件处理器
connector.setHandler(new ClientIoHandler());
// 设置编码过滤器和按行读取数据模式
connector.getFilterChain().addLast("codec",
new ProtocolCodecFilter(new TextLineCodecFactory(Charset.forName("UTF-8"))));
// 创建连接
ConnectFuture future = connector.connect(new InetSocketAddress("localhost", 8080));
// 等待连接创建完成
future.awaitUninterruptibly();
// 获取连接会话
IoSession session = future.getSession();
// 产生当次发送请求标识,标识由客户端地址+随机数
// 这里的标识只是一个例子,标识可以任何方法产生,只要保持在系统中的唯一性
String flag = UUID.randomUUID().toString();
logger.debug("...flag:{}", flag);
// 将标识保存到当前session中
session.setAttribute(ClientIoHandler.SESSION_KEY, flag);
// 向服务器发送命令信息
session.write(cmd);
// 等待连接断开
session.getCloseFuture().awaitUninterruptibly();
connector.dispose();
// 通过标识获取保存的结果
Object result = ClientIoHandler.RESULTS.get(flag);
// 清除标识内容
ClientIoHandler.RESULTS.remove(flag);
// 将结果返回客户端
return result.toString();
}
ClientIoHandler:
[java] view plaincopyprint?
01.public class ClientIoHandler extends IoHandlerAdapter {
02. private final Logger logger = LoggerFactory.getLogger(ClientIoHandler.class);
03. public static final String SESSION_KEY = "com.company.project.client.clientiohandle.SESSION_KEY";
04. public static final Map<String, Object> RESULTS = new ConcurrentHashMap<String, Object>();
05.
06. public void messageReceived(IoSession session, Object message) throws Exception {
07. logger.debug("messageReceived...");
08. // 从session中取到标识
09. String flag = (String) session.getAttribute(SESSION_KEY);
10. logger.debug("...flag:{}", flag);
11. // 将从服务端接收到的message和标识绑定保存,可以保存到内存、文件、数据库等等
12. // 在这里简单的以标识为key,message为value保存到Map中
13. RESULTS.put(flag, message);
14. // 关闭session
15. session.close(true);
16. }
17.}
public class ClientIoHandler extends IoHandlerAdapter {
private final Logger logger = LoggerFactory.getLogger(ClientIoHandler.class);
public static final String SESSION_KEY = "com.company.project.client.clientiohandle.SESSION_KEY";
public static final Map<String, Object> RESULTS = new ConcurrentHashMap<String, Object>();
public void messageReceived(IoSession session, Object message) throws Exception {
logger.debug("messageReceived...");
// 从session中取到标识
String flag = (String) session.getAttribute(SESSION_KEY);
logger.debug("...flag:{}", flag);
// 将从服务端接收到的message和标识绑定保存,可以保存到内存、文件、数据库等等
// 在这里简单的以标识为key,message为value保存到Map中
RESULTS.put(flag, message);
// 关闭session
session.close(true);
}
}
后继再补充...
相关推荐
- 编解码器是mina中的核心组件,分为编码器(Encoder)和解码器(Decoder)两部分,分别处理数据的编码和解码过程。 - 编码器将应用对象转化为ByteBuffer,以便于网络传输;解码器则将接收到的ByteBuffer还原为...
下面我们将深入探讨Java Mina的关键特性、主要组件以及如何在实际项目中使用它们。 1. **核心组件**: - **Acceptor**:负责监听网络连接请求,并在新连接到来时创建一个Session来处理。 - **Session**:表示网络...
在Java的网络编程中,Apache Mina是一个非常重要的框架,它提供了一个高度可扩展和高性能的网络应用程序开发基础。Mina允许开发者构建基于TCP、UDP等传输协议的服务器端和客户端应用,而无需关注底层的Socket编程...
数据的编码和解码是使用Mina时最核心的部分,因为它涉及到数据在网络中的传输格式。 IoHandler接口是编写业务逻辑的地方,接收和发送数据均在此进行。在实际使用中,你需要实现此接口来处理来自客户端的请求和发送...
《mina多路分离解码详解》 mina框架是Apache软件基金会的一个开源项目,它为Java开发者提供了一套高效、功能丰富的网络...对于需要处理大量并发连接的网络应用,mina框架和它的多路分离解码功能无疑是理想的解决方案。
Java Mina框架是一款高度可扩展且高性能的网络应用开发框架,专为开发网络服务和协议处理应用程序而设计。它提供了一种简洁、高效的API,使得开发者可以轻松地创建基于TCP/IP和UDP/IP协议的服务器和客户端应用。Mina...
MINA(Java Multicast Network Application Framework)是Apache软件基金会开发的一个网络应用框架,主要用于构建高性能、高可用性的网络服务器。这个框架提供了丰富的网络通信API,支持多种协议,如TCP/IP、UDP/IP...
java mina 通讯框架
源码分析还能帮助我们理解Java的多线程、并发控制和事件驱动编程等高级特性,提升我们的编程技能和解决问题的能力。 此外,对于Java开发者来说,熟悉Mina2源码有助于理解其他类似的网络通信框架,比如Netty,因为...
通过分析这些代码,我们可以深入理解Mina如何处理网络通信中的编码解码问题,以及如何根据具体需求设计和实现自定义的`Codec`。 总的来说,Mina编解码示例是一个很好的学习资源,帮助开发者掌握如何在实际项目中...
JAVA Mina框架是一款高度可扩展、高性能的网络应用开发框架,专为Java平台设计。它提供了丰富的网络通信API,使得开发者能够轻松地构建基于TCP/IP、UDP/IP以及其他协议的服务器和客户端应用程序。Mina框架的核心设计...
这是一个有关Mina在Java通信中运用的简单的入门实例,MIna自带一种触发机制,无需再开线程等待收发数据。这个实例中有客户端和服务端,与软件TCPUDPDbg进行文字通话测试。用的时候注意添加Mina包,此实例中用到的是...
html5协议websocket与java服务器的一个简单聊天应用,服务器使用了mina框架,代码中对websocket数据交互协议进行了注释说明,MinaEncoder类与MinaDecoder类对应数据的编码与解码。
5. **强大的社区支持**:MINA背后有活跃的开源社区,提供了大量的文档、示例和问题解答,方便开发者快速入门和解决问题。 6. **版本稳定**:MINA的版本号为2.0.5,这意味着它经过了多次迭代和优化,稳定性较好,...
在“Java Mina实例”项目中,你将学习如何使用Mina来创建一个简单的网络应用。首先,你需要确保已经安装了Eclipse集成开发环境,并且下载了包含这个实例项目的压缩包。压缩包中的文件很可能是源代码和必要的依赖库,...
在Java Mina中,Server是服务端,它监听特定的端口,等待客户端连接。客户端(Client)则通过连接服务器来发送和接收数据。以下是对Mina核心组件和相关知识点的详细解释: 1. **IoAcceptor**:这是Mina服务器的核心...
在本文中,我们将深入探讨MINA框架中的编解码器(Codec)概念,并通过提供的`mina_server`和`minaclient`两个文件夹中的示例,来理解其工作原理。 MINA框架的核心在于它的异步I/O模型,它允许开发者以非阻塞的方式...
总的来说,Apache Mina为解决网络通信中的断包和粘包问题提供了强大的工具。通过理解和应用mina_optimize代码,你可以更好地掌握如何在实际项目中规避这些问题,提升网络通信的稳定性和可靠性。在实际开发中,结合...
在本文中,我们将深入探讨如何利用Java-Mina实现服务端的主动推送功能,结合Spring进行应用集成。 首先,理解Java-Mina的核心概念至关重要。Mina提供了基于I/O多路复用技术(如NIO)的API,使得开发者可以轻松处理...