论坛首页 Java企业应用论坛

Netty + Protobuf 的客户端模式运用和问题探讨

浏览 8846 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
作者 正文
   发表时间:2012-05-14  

使用NETTY之前,当然需要先看一下所带的samples。

 

简单的hello world,可能大家都没啥感觉,觉得NETTY其实很简单:

 

1. 对于服务器端,需要写下面几个:

a. 写个ServerHandler,来接收并处理服务端业务逻辑;

b. 照葫芦画瓢整个Pineline,比如ServerPipelineFactory,把一些现成的和自己的ServerHandler串糖葫芦那样串起来;

c. 最后写个简单的Server,把ServerBootstrap和ServerPipelineFactory装起来;

d. 好吧,再加一些优化/适合的参数,比如child.tcpNoDelay,child.keepAlive之类的。

典型代码如下:

ServerBootstrap bootstrap = new ServerBootstrap(
                new NioServerSocketChannelFactory(
                		Executors.newCachedThreadPool(),
                		Executors.newCachedThreadPool(),
                		threadpool_worker)
                );

        // Parameters for tuning
        bootstrap.setOption("child.tcpNoDelay", true);
        //bootstrap.setOption("child.keepAlive", true);
        
        // Set up the event pipeline factory.
        bootstrap.setPipelineFactory(serverPipelineFactory);

        // Bind and start to accept incoming connections.
        bootstrap.bind(new InetSocketAddress(port));
        
        // please use System.out directly here. DON'T CHANGE IT!!!
        System.out.println("====Server is ready to rock on port:" + port + "====");
 

 

2. 对于客户端,其实与服务器端非常相似,如下:

a. 写个ClientHandler,来接收并处理客户端业务逻辑;

b. 照葫芦画瓢整个Pineline,比如ClientPipelineFactory,把一些现成的和自己的ClientHandler串糖葫芦那样串起来;

c. 最后写个简单的Client,把ClientBootstrap和ClientPipelineFactory装起来。

典型代码如下:

// Set up.
        bootstrap = new ClientBootstrap(
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool()));

        // Configure the event pipeline factory.
        bootstrap.setPipelineFactory(new ClientPipelineFactory());

        // Make a new connection.
        logger.debug("trying to connect to host[{}] and port[{}]...", host, port);
        channelFuture = bootstrap.connect(new InetSocketAddress(host, port));

        // Wait until the connection is made successfully.
        channel = channelFuture.awaitUninterruptibly().getChannel();
        logger.debug("successfully connected to host[{}] and port[{}]!", host, port);
 

一般而言,hello world就可以玩了。

 

 

但对于实战性的应用开发,问题才刚刚开始:

1. 针对NETTY,合理的设计模式如何运用?

对于服务端,往往是在ServerHandler里,接收=》处理=》write,一般关照好messageReceived方法即可,比较简单,至于如何搞定你的业务逻辑设计,跟NETTY无关,在此不谈。

对于客户端,我们不得不关心,有几个要点:

a. 收发是异步的,如果发送者只管发而不管后果,这也可以不谈,跟Server端一样,非常简单

b. 如果发送者发了还要管收,这就来劲了,怎么玩?这是我们要探讨的第一个问题,我先抛出来,接下来再议

 

2. 为了引入高性能的binary protocol, 引入了google的protobuf,暂时没发现问题,如果是同构平台(java)的话,堪称完美,但由于netty针对protocol有专门的Decoder/Encoder。

问题二是:我google了半天,好像没有教好的案例,可以实现异构平台(如.net)访问NETTY + Protobuf的参考?

 

 

各位如有经验的,可以来分享、讨论,我会继续跟进。

   发表时间:2012-05-14  

第一个问题涉及的是blocking request/response的需求。

为了抛砖引玉,我先来分析一下例子里面的做法,以localtime为例。

套路如下:
1. 客户端建立连接后,给服务端发送消息。

LocalTimeClient.java

        // Configure the event pipeline factory.
        bootstrap.setPipelineFactory(new LocalTimeClientPipelineFactory());

        // Make a new connection.
        ChannelFuture connectFuture =
            bootstrap.connect(new InetSocketAddress(host, port));

        // Wait until the connection is made successfully.
        Channel channel = connectFuture.awaitUninterruptibly().getChannel();

        // Get the handler instance to initiate the request.
        LocalTimeClientHandler handler =
            channel.getPipeline().get(LocalTimeClientHandler.class);

        // Request and get the response.
        List<String> response = handler.getLocalTimes(cities);
        // Close the connection.
        channel.close().awaitUninterruptibly();

        // Shut down all thread pools to exit.
        bootstrap.releaseExternalResources();

 

2. 在LocalTimeClientHandler里面,放一个BlockingQueue,这样所有收到的消息讲offer到这个queue里,发送端侦听这个queue就可以了。

public class LocalTimeClientHandler extends SimpleChannelUpstreamHandler {

    ...

    private final BlockingQueue<LocalTimes> answer = new LinkedBlockingQueue<LocalTimes>();

    public List<String> getLocalTimes(Collection<String> cities) {
        Locations.Builder builder = Locations.newBuilder();

        for (String c: cities) {
            String[] components = c.split("/");
            builder.addLocation(Location.newBuilder().
                setContinent(Continent.valueOf(components[0].toUpperCase())).
                setCity(components[1]).build());
        }

        channel.write(builder.build());

        LocalTimes localTimes;
        boolean interrupted = false;
        for (;;) {
            try {
                localTimes = answer.take();
                break;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        List<String> result = new ArrayList<String>();
        for (LocalTime lt: localTimes.getLocalTimeList()) {
            result.add(
                    new Formatter().format(
                            "%4d-%02d-%02d %02d:%02d:%02d %s",
                            lt.getYear(),
                            lt.getMonth(),
                            lt.getDayOfMonth(),
                            lt.getHour(),
                            lt.getMinute(),
                            lt.getSecond(),
                            lt.getDayOfWeek().name()).toString());
        }

        return result;
    }

    ...
}

 

 

刚开始我以为这是“标准模式”,可后来发现性能瓶颈惨的吓人,仔细分析一下就知道了:

1. 这个queue是block queue,所以offer到这个queue以后,将串行压栈;

2. 发送者面对一个长长的queue,不知道谁才是自己要等的人

 

另外也看到有人在咨询这个问题,最有价值的参考之一是这里:

Blocking request/response with Netty

但遗憾的是,这个可以解决上述问题2,即发送者可以等到自己要的人,但也没法解决高并发需求,同样沿用:发送->加锁->接收->处理响应->解锁的套路。

 

 

 

0 请登录后投票
   发表时间:2012-05-29  
楼主同学,现在找到好的实现模式了么?我也正在纠结,要利用netty实现的是顺序化的一问一答的频繁短连接,,,觉得那个ChannelHandler反而很碍事,想不到好办法。。。。
0 请登录后投票
   发表时间:2012-05-29  
whatable 写道
楼主同学,现在找到好的实现模式了么?我也正在纠结,要利用netty实现的是顺序化的一问一答的频繁短连接,,,觉得那个ChannelHandler反而很碍事,想不到好办法。。。。




我目前有个初稿,在前述思路的基础上进行了改进,要点如下:


1.在ClientHandler中加了一个ConcurrentMap,作为请求<->响应的消息绑定点。

 

public static final ConcurrentMap<String, 
    	LinkedBlockingQueue<MatchResponse>> 
    	QUEUE_MAP = new ConcurrentHashMap<String, 
    		LinkedBlockingQueue<MatchResponse>>();

特别要指出的是,这个Map里面的值对象,是Queue.

 

 

2. Client在发送前,创建一个Queue并侦听

 

try{
	        queue = new LinkedBlockingQueue<MatchResponse>();
	        ClientHandler.QUEUE_MAP.put(key, queue);
	
	        this.getChannel().write(builder.build());
	        matchResponse = waitAndGetMatchResponse(key, queue);
}catch(ClosedMatchException e){
	    	logger.error(e.getMessage(), e);
	    	throw e;
}finally{
	    	queue = null;
	    	ClientHandler.QUEUE_MAP.remove(key);
}

 

再看看waitAndGetMatchResponse方法:

private MatchResponse waitAndGetMatchResponse(
			String key,
			LinkedBlockingQueue<MatchResponse> queue) 
					throws ClosedMatchException{
		MatchResponse matchResponse = null;
		boolean interrupted = false;
        for (;;) {
            try {
            	//REMOVE ME after benchmark
            	//System.out.println("==>Current QUEUE_MAP size: " + ClientHandler.QUEUE_MAP.size());
            	logger.debug("Current QUEUE_MAP size: {}", ClientHandler.QUEUE_MAP.size());
            	matchResponse = queue.poll(Constants.RESPONSE_TIMEOUT, TimeUnit.SECONDS);
            	
            	if(matchResponse == null){
            		//timeout
            		throw new ClosedMatchException("Response time out after [" + Constants.RESPONSE_TIMEOUT + "] seconds.");
            	}
            	
            	if(matchResponse.getError().getMessage()!=null
            			&& matchResponse.getError().getMessage().length()>0){
            		//error message
            		Error error = (Error) matchResponse.getError();
            		throw new ClosedMatchException(this.buildError(error));
            	}
                break;
            } catch (InterruptedException e) {
                interrupted = true;
            }
        }

        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        
        return Converter.convertResponse(matchResponse);
	}

 

3. 最后看看怎么接受消息

 

@Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
    	MatchResponse
    		message = (MatchResponse)e.getMessage();
    	String key = message.getKey();
    	if(key!=null && key.length()>0){
	    	BlockingQueue<MatchResponse> 
	    		queue = QUEUE_MAP.get(message.getKey());
	    	queue.offer(message);
    	}else{
    		//no key's coming back? This is caused by exceptionCaught in ServerHandler while debugging
    		//it never happens here in production env
    	}
    }
 

 

That's it.

 

 

这里简单总结一下:

1. 同样还是send->block->get response,但一个请求会启用一个最短命的Queue;

2. 使用ConcurrentMap来衔接类似的请求,以达到提高并发的目的。

 

PS:这些客户端的小伎俩,与服务器端无关,唯一要考虑的仅仅是ConcurrentMap的KEY,作为两者衔接的桥梁。

 

仅供参考!

0 请登录后投票
   发表时间:2012-05-30  
基本上和你的思路一致,细节上有些小出入。

我这边也是客户端的问题,原本有个基于netty默认的回调式API供GUI调用,现在要封装出一个阻塞式顺序调用的API,,,,目的是为了供脚本调用(多年积累的大批量已经写好的脚本,不可能全都改成回调触发式),,,

我今天上午刚刚写出来一个雏形——现在也是发送消息之前暂存一个自定义的callback对象(等同于你那个queue),(设置了守护线程用来剔除超时的callback),,,,消息发送之后立即利用callback的wait()挂起当前线程,等读到服务器端消息后再查找到相应的callback,做notify(),这样发送的线程就又重新工作了。。。。这样达成了一问一答的阻塞API,基本功能测试通过,但没有通过压力测试,,,,经常会出现不能正确唤醒挂起的线程的问题。我研究并借鉴你写的细节再完善一下试试看~~

感谢分享!!!!
0 请登录后投票
   发表时间:2012-05-31  
whatable 写道
基本上和你的思路一致,细节上有些小出入。

我这边也是客户端的问题,原本有个基于netty默认的回调式API供GUI调用,现在要封装出一个阻塞式顺序调用的API,,,,目的是为了供脚本调用(多年积累的大批量已经写好的脚本,不可能全都改成回调触发式),,,

我今天上午刚刚写出来一个雏形——现在也是发送消息之前暂存一个自定义的callback对象(等同于你那个queue),(设置了守护线程用来剔除超时的callback),,,,消息发送之后立即利用callback的wait()挂起当前线程,等读到服务器端消息后再查找到相应的callback,做notify(),这样发送的线程就又重新工作了。。。。这样达成了一问一答的阻塞API,基本功能测试通过,但没有通过压力测试,,,,经常会出现不能正确唤醒挂起的线程的问题。我研究并借鉴你写的细节再完善一下试试看~~

感谢分享!!!!


也感谢你的分享。你的callback跟我使用的blockingqueue是一样的,只要做出基本的容错处理即可。
我这种模式即便在单一server node的时候,吞吐量也能达到比较满意的基线(含复杂业务处理的情况下达到100w tx /day),且非常稳定。


其实发送=>结束 + 接收=>回调 的模式,是典型的收发各不知会的模式,适应于需要大throughput的业务场景。
这种业务场景对于异常处理的套路会有点不一样,可能需要引入合理的业务补偿机制,确保具备合理的容错能力。

而我所要解决的问题视角有点不一样,我是站在客户端的角度来看问题,需要解决的是单一客户端在阻塞模式如何提高throughput。
这种业务场景关注于高并发下的快速应答,容错能力客户端调用方根据应答自行解决。
0 请登录后投票
   发表时间:2012-06-01  
我用mina也做过这种需求,差不多这个思路,继承defaultIofuture,wait/notify
0 请登录后投票
   发表时间:2012-12-18  
可以看看netty中 OrderedMemoryAwareThreadPoolExecutor
这个貌似能解决你们所说的顺序问题,针对每个channel是独立阻塞
0 请登录后投票
论坛首页 Java企业应用版

跳转论坛:
Global site tag (gtag.js) - Google Analytics