锁定老帖子 主题:基于netty写的网络通信框架
精华帖 (0) :: 良好帖 (2) :: 新手帖 (0) :: 隐藏帖 (2)
|
|
---|---|
作者 | 正文 |
发表时间:2012-06-05
最后修改:2012-06-25
但是又有一个问题,调用的兄弟需要在web中请求这种tcp服务,netty内部是异步处理机制,http是伪长连接,调用结束后,异步请求还没有返回,http连接就断开了,返回的是null。所以这个问题要解决一下。 下面说下封装的各个类的代码吧 首先当客户端对远程服务器发起tcp请求时,这时候请求一般会到达服务器端的handler里,我写的这个handler继承了netty的SimpleChannelUpstreamHandler,代码如下: public abstract class ChannelServerHandler extends SimpleChannelUpstreamHandler { private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelServerHandler.class); protected final Map<String, InvokeHandler> handlers = new HashMap<String, InvokeHandler>(); protected final Map<String, Method> initMethods = new HashMap<String, Method>(); public ChannelServerHandler() { WSCFInit.register(handlers, initMethods); } protected abstract void processor(Channel channel, Object message); @Override public final void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { Transport t = (Transport) e.getMessage(); String className = t.getClazz(); String methodName = t.getMethod(); logger.info("Invoke Handler:" + className + ", Invoke Method:" + methodName); processor(ctx.getChannel(), t); } 里面有几个变量需要解释一下,handlers是开发tcp服务端的handler存放的map,initMethods是里面需要调用的方法,通过WSCFInit类来进行初始化工作。 它主要做了如下工作,在服务器端Server启动的时候,扫描固定包下的handler和他们的方法,然后以clazz+method的方式存放在handlers和initMthods这两个map中。 Reflections reflections = new Reflections("packagename"); Set<Class<?>> annotated = reflections.getTypesAnnotatedWith((Class<? extends Annotation>) annClass); Iterator<Class<?>> it = annotated.iterator(); while (it.hasNext()) { Class<?> next = it.next(); if (next.isAnnotationPresent(Handler.class)) { Annotation ann = (Annotation) next.getAnnotation((Class<? extends Annotation>) annClass); handlers.put(((Handler) ann).name(), (InvokeHandler) next.newInstance()); Method[] methods = next.getDeclaredMethods(); for (Method method : methods) { if (method.isAnnotationPresent(Remote.class)) { Remote path = method.getAnnotation(Remote.class); initMethods.put(((Handler) ann).name() + path.url(), method); } } } } protected abstract void processor(Channel channel, Object message);这个方法具体的逻辑是由它的子类来处理的。 再看一下ServerHandler类里面processor的代码,这个类继承了ChannelServerHandler @Override protected void processor(Channel channel, Object message) { Transport transport = (Transport) message; InvokeHandler handler = handlers.get(transport.getClazz()); Object[] params = (Object[]) transport.getMessage(); Object ret = null; try { Method method = initMethods.get(transport.getClazz() + transport.getMethod()); if (method == null) { } else { ret = method.invoke(handler, params); } ServerSender sender = new ServerSender(channel, transport); sender.send(ret); } catch (Exception e) { throw new IllegalAccessError(e.getMessage()); } } 客户端向服务器端发起的请求真正处理的逻辑在这个方法里面,这个方法在处理完调用了相应的服务端handler进行响应后,会将需要返回给客户端的信息封装在transport这个对象然后传递出去,这个对象是封装服务器和客户端通信消息的。 那么Transport这个类定义了些什么内容呢 public final class Transport implements Serializable { private static final long serialVersionUID = 1675991188209117209L; private String clazz; private String method; private Object message; private String key; clazz是要调用的handler的注解名,method是要调用的方法的注解名,message是封装通信消息的,key代表一个token,客户端将这个token发给服务器端,服务器端根据这个token进行查找,最后将token和处理结果一起返回给客户端。 ok,处理服务端信息的handler看完了,我们再来看看客户端的 public abstract class ChannelClientHandler extends SimpleChannelUpstreamHandler { private final InternalLogger logger = InternalLoggerFactory.getInstance(ChannelClientHandler.class); public final Map<String, ResultHandler> ret = new ConcurrentHashMap<String, ResultHandler>(); protected abstract void processor(Channel channel, Object message); @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { processor(ctx.getChannel(), e.getMessage()); } } ret是封装服务器端返回结果的,它的子类负责实现processor方法。 子类代码如下 @Override protected void processor(Channel channel, Object message) { Transport t = (Transport) message; String key = t.getClazz() + t.getMethod() + t.getKey(); ResultHandler r = ret.remove(key); r.processor(t.getMessage()); } ResultHandler这个是我定义的一个接口,专门处理异步返回的结果,可以通过匿名函数调用 public interface ResultHandler<T> { public void processor(T message); } 上面说了如何处理netty内部的异步机制,让主线程能够等待异步返回的结果 处理代码如下 public Object get(String url, Object... params) { class Result { public Object o; } final Result ret = new Result(); synchronized (ret) { try { invoke(url, params, new ResultHandler() { @Override public void processor(Object message) { synchronized (ret) { ret.o = message; ret.notify(); } } }); ret.wait(); } catch (InterruptedException e) { } return ret.o; } } 当客户端调用get方法时候,就可以得到服务器端异步返回的结果了。但是对客户端来说,他感觉到的是同步的调用。 最后我定义了一个InvokeHandler,当开发者开发服务端程序时候,需要实现这个接口,定义自己的handler 类似如下 @Handler(name="testhandler") public class Server1 implements InvokeHandler { @Remote(url="test2") public String say(String msg) { System.out.println(msg); return "hi"; } @Remote(url= "test2") public String say2(Person p) { System.out.println(p.getId()); return p.getName(); } } 上面定义的这些注解,在WSCFInit初始化的时候会放到一个map里面,类似于spring的配置文件。 最后再说说客户端是怎么调用的,在连接好服务端ip和port后,通过如下调用方式就可以了 public class Client { private static ClientSender sender; public static void main(String[] args) { sender = ClientProxy.connect(ip, port); Object msg = sender.get("tcp://testhandler/test1", "hello"); System.out.println(msg); Person p = new Person(); p.setId(1); p.setName("zhangsan"); Object o = sender.get("tcp://testhandler/test2", p); System.out.println(o); } } 这样就ok了,也可以自定义要传输的是对象还是xml还是json。同时可以方便的定义自己的解码器。完成自己的业务需求。 接上文,这个服务是基于netty的,每connect一次,就会在服务器上建立一个tcp连接,就是一对pipe,如果不及时释放,那么建立的pipe会越来越多,严重浪费服务器的资源。但是如果释放了,就失去了tcp长连接的作用了。所以折中一下,为了减少连接数,保证客户端的固定连接,服务端不变,在客户端加入连接池功能。 public synchronized ClientSender getClientSender() { Channel channel = getChannel(); if(!channel.isOpen()) { connectPool.remove(channel); channel = createConnect(address, port); connectPool.addLast(channel); } return new ClientSender(channel, handler.ret, this); } ok,这样就可以用到连接池功能了。每个客户端可以用到固定连接数了。 那么客户端调用的时候需要自己动手创建ConnectPool了 public class ClientProxy { private static ConnectPool pool = null; public static void connect(String address, int port) { if (pool == null) { synchronized (ClientProxy.class) { if (pool == null) { pool = ConnectPool.createProxy(address, port); } } } } public static Object get(String url, Object... params) { if(pool == null) { throw new IllegalStateException("must invoke connect method first"); } ClientSender sender = pool.getClientSender(); Object msg = sender.get(url, params); sender.free(); //归还连接 return msg; } } 调用方式就不再是上面那样了,要如下调用: ClientProxy.connect(ip, port); Person p = new Person(); p.setId(12); p.setName("zhangsan"); Object msg = ClientProxy.get("tcp://server/test1", p); System.out.println(msg); 来看下控制台 Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool INFO: 当前连接池中连接数量:5 Jun 19, 2012 3:24:06 PM com.qunar.wscf.pool.ConnectPool INFO: 连接池中剩余连接数量:4 zhangsan 上次加了连接池功能,后来又提出了一个需求,就是,原来是主线程一直在等待异步线程返回,如果没有返回,主线程就阻塞了,进行不下去。后来发现这个太受限制了。主线程可以先边做自己的事情边等待异步线程处理,符合nio的事件处理机制。于是基于上一点,改造成基于异步的同步。 private static class Result { Object obj; } final Result f = new Result(); private volatile boolean hasNotified = false; //异步线程是否结束的标志 public void preGet(String url, Object... params) { invoke(url, params, new ResultHandler() { @Override public void processor(Object message) { synchronized (f) { f.obj = message; f.notify(); //异步线程结束,唤醒客户端线程 hasNotified = true; } } }); } public Object get() { synchronized (f) { if (!hasNotified) //如果异步线程没有结束,则客户端线程等待 try { f.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } free(); // 释放连接 return f.obj; } 测试如下: ClientProxy.connect(ip, port); Person p = new Person(); p.setId(12); p.setName("zhangsan"); Future future = ClientProxy.get("tcp://server/test1", p); String str = "hello "; //主线程继续做自己的事情,边做边等待异步返回 System.out.println(str + future.get()); 最后打印结果 Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool INFO: 当前连接池中连接数量:5 Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool INFO: 连接池中剩余连接数量:4 Jun 20, 2012 2:26:12 PM com.qunar.wscf.pool.ConnectPool INFO: 归还连接后连接池中连接的数量:5 hello zhangsan 还有一种情况,主线程等待一段时间后,在规定时间内没有返回,主线程就不等待了。 所以代码改造下,加入超时功能 public Object get() { return get(0); } public Object get(long timeout) { synchronized (f) { if (!hasNotified) try { f.wait(timeout); //主线程等待一定时间 } catch (InterruptedException e) { throw new IllegalStateException(e); } } free(); // 释放连接 return f.obj; } 客户端调用,比如1个毫秒没返回,就不再等待了,主线程可以自行处理。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-06-05
很优秀的设计,学习了, prefect!!,希望楼主继续更新相关博文,持续关注,3q
|
|
返回顶楼 | |
发表时间:2012-06-07
轻量级远程服务调用,楼主这么好的文章怎么没人支持啊。
不过提两点建议: 1.服务貌似不支持集群和失败重试,不然提供服务的那个节点挂掉了就危险了。 2.服务提供着是直接实例化的,最好能够跟spring集成一下,比如从spring容器中获取service,毕竟现在的中间层都是spring。 |
|
返回顶楼 | |
发表时间:2012-06-07
这个不错,还没细看,有时间拜读一下。
|
|
返回顶楼 | |
发表时间:2012-06-07
很久以前我做项目的时候写过一个和楼主描述很类似的功能,后来想想,为啥不用hessian呢?
|
|
返回顶楼 | |
发表时间:2012-06-07
bbossgroup,RPC很强大!!!
|
|
返回顶楼 | |
发表时间:2012-06-08
rain2005 写道 轻量级远程服务调用,楼主这么好的文章怎么没人支持啊。
不过提两点建议: 1.服务貌似不支持集群和失败重试,不然提供服务的那个节点挂掉了就危险了。 2.服务提供着是直接实例化的,最好能够跟spring集成一下,比如从spring容器中获取service,毕竟现在的中间层都是spring。 谢谢你的建议,我会加以改进,然后再优化后发上来。 ps:怎么会有人投隐藏呢?有什么设计不好的地方可以提出来啊 |
|
返回顶楼 | |
发表时间:2012-06-12
我们项目正在使用rmi,楼主能否大概说一下你们测的rmi性能情况 另外我私底下也在写和你这个差不多的,不过是基于mina的,希望多交流哦
|
|
返回顶楼 | |
发表时间:2012-06-19
最后修改:2012-06-19
楼主 代码能否借来一用
|
|
返回顶楼 | |
发表时间:2012-06-20
ztfjava 写道 楼主 代码能否借来一用
呵呵,可以的话可以站内私信探讨技术。 |
|
返回顶楼 | |