精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2012-05-14
使用NETTY之前,当然需要先看一下所带的samples。
简单的hello world,可能大家都没啥感觉,觉得NETTY其实很简单:
1. 对于服务器端,需要写下面几个: a. 写个ServerHandler,来接收并处理服务端业务逻辑; b. 照葫芦画瓢整个Pineline,比如ServerPipelineFactory,把一些现成的和自己的ServerHandler串糖葫芦那样串起来; c. 最后写个简单的Server,把ServerBootstrap和ServerPipelineFactory装起来; d. 好吧,再加一些优化/适合的参数,比如child.tcpNoDelay,child.keepAlive之类的。 典型代码如下: ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), threadpool_worker) ); // Parameters for tuning bootstrap.setOption("child.tcpNoDelay", true); //bootstrap.setOption("child.keepAlive", true); // Set up the event pipeline factory. bootstrap.setPipelineFactory(serverPipelineFactory); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(port)); // please use System.out directly here. DON'T CHANGE IT!!! System.out.println("====Server is ready to rock on port:" + port + "====");
2. 对于客户端,其实与服务器端非常相似,如下: a. 写个ClientHandler,来接收并处理客户端业务逻辑; b. 照葫芦画瓢整个Pineline,比如ClientPipelineFactory,把一些现成的和自己的ClientHandler串糖葫芦那样串起来; c. 最后写个简单的Client,把ClientBootstrap和ClientPipelineFactory装起来。 典型代码如下: // Set up. bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Configure the event pipeline factory. bootstrap.setPipelineFactory(new ClientPipelineFactory()); // Make a new connection. logger.debug("trying to connect to host[{}] and port[{}]...", host, port); channelFuture = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is made successfully. channel = channelFuture.awaitUninterruptibly().getChannel(); logger.debug("successfully connected to host[{}] and port[{}]!", host, port); 一般而言,hello world就可以玩了。
但对于实战性的应用开发,问题才刚刚开始: 1. 针对NETTY,合理的设计模式如何运用? 对于服务端,往往是在ServerHandler里,接收=》处理=》write,一般关照好messageReceived方法即可,比较简单,至于如何搞定你的业务逻辑设计,跟NETTY无关,在此不谈。 对于客户端,我们不得不关心,有几个要点: a. 收发是异步的,如果发送者只管发而不管后果,这也可以不谈,跟Server端一样,非常简单 b. 如果发送者发了还要管收,这就来劲了,怎么玩?这是我们要探讨的第一个问题,我先抛出来,接下来再议
2. 为了引入高性能的binary protocol, 引入了google的protobuf,暂时没发现问题,如果是同构平台(java)的话,堪称完美,但由于netty针对protocol有专门的Decoder/Encoder。 问题二是:我google了半天,好像没有教好的案例,可以实现异构平台(如.net)访问NETTY + Protobuf的参考?
各位如有经验的,可以来分享、讨论,我会继续跟进。 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2012-05-14
第一个问题涉及的是blocking request/response的需求。 LocalTimeClient.java // Configure the event pipeline factory. bootstrap.setPipelineFactory(new LocalTimeClientPipelineFactory()); // Make a new connection. ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is made successfully. Channel channel = connectFuture.awaitUninterruptibly().getChannel(); // Get the handler instance to initiate the request. LocalTimeClientHandler handler = channel.getPipeline().get(LocalTimeClientHandler.class); // Request and get the response. List<String> response = handler.getLocalTimes(cities); // Close the connection. channel.close().awaitUninterruptibly(); // Shut down all thread pools to exit. bootstrap.releaseExternalResources();
2. 在LocalTimeClientHandler里面,放一个BlockingQueue,这样所有收到的消息讲offer到这个queue里,发送端侦听这个queue就可以了。 public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler { ... private final BlockingQueue<LocalTimes> answer = new LinkedBlockingQueue<LocalTimes>(); public List<String> getLocalTimes(Collection<String> cities) { Locations.Builder builder = Locations.newBuilder(); for (String c: cities) { String[] components = c.split("/"); builder.addLocation(Location.newBuilder(). setContinent(Continent.valueOf(components[0].toUpperCase())). setCity(components[1]).build()); } channel.write(builder.build()); LocalTimes localTimes; boolean interrupted = false; for (;;) { try { localTimes = answer.take(); break; } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } List<String> result = new ArrayList<String>(); for (LocalTime lt: localTimes.getLocalTimeList()) { result.add( new Formatter().format( "%4d-%02d-%02d %02d:%02d:%02d %s", lt.getYear(), lt.getMonth(), lt.getDayOfMonth(), lt.getHour(), lt.getMinute(), lt.getSecond(), lt.getDayOfWeek().name()).toString()); } return result; } ... }
刚开始我以为这是“标准模式”,可后来发现性能瓶颈惨的吓人,仔细分析一下就知道了: 1. 这个queue是block queue,所以offer到这个queue以后,将串行压栈; 2. 发送者面对一个长长的queue,不知道谁才是自己要等的人
另外也看到有人在咨询这个问题,最有价值的参考之一是这里: Blocking request/response with Netty 但遗憾的是,这个可以解决上述问题2,即发送者可以等到自己要的人,但也没法解决高并发需求,同样沿用:发送->加锁->接收->处理响应->解锁的套路。
|
|
返回顶楼 | |
发表时间:2012-05-29
楼主同学,现在找到好的实现模式了么?我也正在纠结,要利用netty实现的是顺序化的一问一答的频繁短连接,,,觉得那个ChannelHandler反而很碍事,想不到好办法。。。。
|
|
返回顶楼 | |
发表时间:2012-05-29
whatable 写道
楼主同学,现在找到好的实现模式了么?我也正在纠结,要利用netty实现的是顺序化的一问一答的频繁短连接,,,觉得那个ChannelHandler反而很碍事,想不到好办法。。。。
public static final ConcurrentMap<String, LinkedBlockingQueue<MatchResponse>> QUEUE_MAP = new ConcurrentHashMap<String, LinkedBlockingQueue<MatchResponse>>(); 特别要指出的是,这个Map里面的值对象,是Queue.
2. Client在发送前,创建一个Queue并侦听
try{ queue = new LinkedBlockingQueue<MatchResponse>(); ClientHandler.QUEUE_MAP.put(key, queue); this.getChannel().write(builder.build()); matchResponse = waitAndGetMatchResponse(key, queue); }catch(ClosedMatchException e){ logger.error(e.getMessage(), e); throw e; }finally{ queue = null; ClientHandler.QUEUE_MAP.remove(key); }
再看看waitAndGetMatchResponse方法: private MatchResponse waitAndGetMatchResponse( String key, LinkedBlockingQueue<MatchResponse> queue) throws ClosedMatchException{ MatchResponse matchResponse = null; boolean interrupted = false; for (;;) { try { //REMOVE ME after benchmark //System.out.println("==>Current QUEUE_MAP size: " + ClientHandler.QUEUE_MAP.size()); logger.debug("Current QUEUE_MAP size: {}", ClientHandler.QUEUE_MAP.size()); matchResponse = queue.poll(Constants.RESPONSE_TIMEOUT, TimeUnit.SECONDS); if(matchResponse == null){ //timeout throw new ClosedMatchException("Response time out after [" + Constants.RESPONSE_TIMEOUT + "] seconds."); } if(matchResponse.getError().getMessage()!=null && matchResponse.getError().getMessage().length()>0){ //error message Error error = (Error) matchResponse.getError(); throw new ClosedMatchException(this.buildError(error)); } break; } catch (InterruptedException e) { interrupted = true; } } if (interrupted) { Thread.currentThread().interrupt(); } return Converter.convertResponse(matchResponse); }
3. 最后看看怎么接受消息
@Override public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { MatchResponse message = (MatchResponse)e.getMessage(); String key = message.getKey(); if(key!=null && key.length()>0){ BlockingQueue<MatchResponse> queue = QUEUE_MAP.get(message.getKey()); queue.offer(message); }else{ //no key's coming back? This is caused by exceptionCaught in ServerHandler while debugging //it never happens here in production env } }
That's it.
这里简单总结一下: 1. 同样还是send->block->get response,但一个请求会启用一个最短命的Queue; 2. 使用ConcurrentMap来衔接类似的请求,以达到提高并发的目的。
PS:这些客户端的小伎俩,与服务器端无关,唯一要考虑的仅仅是ConcurrentMap的KEY,作为两者衔接的桥梁。
仅供参考! |
|
返回顶楼 | |
发表时间:2012-05-30
基本上和你的思路一致,细节上有些小出入。
我这边也是客户端的问题,原本有个基于netty默认的回调式API供GUI调用,现在要封装出一个阻塞式顺序调用的API,,,,目的是为了供脚本调用(多年积累的大批量已经写好的脚本,不可能全都改成回调触发式),,, 我今天上午刚刚写出来一个雏形——现在也是发送消息之前暂存一个自定义的callback对象(等同于你那个queue),(设置了守护线程用来剔除超时的callback),,,,消息发送之后立即利用callback的wait()挂起当前线程,等读到服务器端消息后再查找到相应的callback,做notify(),这样发送的线程就又重新工作了。。。。这样达成了一问一答的阻塞API,基本功能测试通过,但没有通过压力测试,,,,经常会出现不能正确唤醒挂起的线程的问题。我研究并借鉴你写的细节再完善一下试试看~~ 感谢分享!!!! |
|
返回顶楼 | |
发表时间:2012-05-31
whatable 写道 基本上和你的思路一致,细节上有些小出入。
我这边也是客户端的问题,原本有个基于netty默认的回调式API供GUI调用,现在要封装出一个阻塞式顺序调用的API,,,,目的是为了供脚本调用(多年积累的大批量已经写好的脚本,不可能全都改成回调触发式),,, 我今天上午刚刚写出来一个雏形——现在也是发送消息之前暂存一个自定义的callback对象(等同于你那个queue),(设置了守护线程用来剔除超时的callback),,,,消息发送之后立即利用callback的wait()挂起当前线程,等读到服务器端消息后再查找到相应的callback,做notify(),这样发送的线程就又重新工作了。。。。这样达成了一问一答的阻塞API,基本功能测试通过,但没有通过压力测试,,,,经常会出现不能正确唤醒挂起的线程的问题。我研究并借鉴你写的细节再完善一下试试看~~ 感谢分享!!!! 也感谢你的分享。你的callback跟我使用的blockingqueue是一样的,只要做出基本的容错处理即可。 我这种模式即便在单一server node的时候,吞吐量也能达到比较满意的基线(含复杂业务处理的情况下达到100w tx /day),且非常稳定。 其实发送=>结束 + 接收=>回调 的模式,是典型的收发各不知会的模式,适应于需要大throughput的业务场景。 这种业务场景对于异常处理的套路会有点不一样,可能需要引入合理的业务补偿机制,确保具备合理的容错能力。 而我所要解决的问题视角有点不一样,我是站在客户端的角度来看问题,需要解决的是单一客户端在阻塞模式如何提高throughput。 这种业务场景关注于高并发下的快速应答,容错能力客户端调用方根据应答自行解决。 |
|
返回顶楼 | |
发表时间:2012-06-01
我用mina也做过这种需求,差不多这个思路,继承defaultIofuture,wait/notify
|
|
返回顶楼 | |
发表时间:2012-12-18
可以看看netty中 OrderedMemoryAwareThreadPoolExecutor
这个貌似能解决你们所说的顺序问题,针对每个channel是独立阻塞 |
|
返回顶楼 | |
浏览 8838 次