`
Poechant
  • 浏览: 229450 次
博客专栏
Bebe66e7-3a30-3fc9-aeea-cfa3b474b591
Nginx高性能Web服务...
浏览量:24307
5738817b-23a1-3a32-86de-632d7da73b1e
Cumulus实时媒体服务...
浏览量:22095
社区版块
存档分类
最新评论

Practical Netty (5) TCP反向代理服务器

 
阅读更多

Practical Netty (5) TCP反向代理服务器

以下针对 TCP 反向代理服务器。

1. 前端连接被创建时,创建后端连接

一个平凡的 ServerBootstrap 会有如下的一个语句:

serverBootstrap.setPipelineFactory(
    new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline p = pipeline();
            p.addLast("handler", new PoechantProxyHandler());
            return p;
        }
    });

这个PoechantProxyHandler如何实现,就成了关键了。

每个 connection 建立后,会创建一个 channel 专门用于服务这个连接。此时会响应ChannelPipelineHandler的此方法:

public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)

这时你可以为这个 connection(不妨称其为前端连接),创建一个与后端连接的 connection(不妨称其为后端连接)。此时对于后端服务器,你要扮演的是 client 的角色,所以需要一个 ClientBootstrap。该 client 连接成功后,就可以从前端连接中读取数据了。

private volatile Channel outboundChannel;

public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
    
    final Channel inboundChannel = e.getChannel();
    inboundChannel.setReadable(false);

    ClientBootstrap cb = new ClientBootstrap(cf);
    cb.getPipeline().addLast("handler", new BackendChannelHandler(e.getChannel()));
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));

    outboundChannel = f.getChannel();
    f.addListener(new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                inboundChannel.setReadable(true);
            } else {
                inboundChannel.close();
            }
        }
    });
}

ClientBootstrap.connect后会创建一个 Channel,与后端服务器连接。对于 ClientBootstrap 是不存在 parent channel 和 child channel 这样需要考虑的策略的。

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

2. 前端接收的消息,转发给后端

亲,请边看下面的代码,边读我写的这段话。msg是从前端Channel(因为这是一个为前端ServerBootstrap服务的 ChannelPipelineHandler)拿到的消息,然后把它写入到后端连接的ChanneloutboundChannel就是前面我们在前端连接被建立时创建的后端Channel)。

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {
    ChannelBuffer msg = (ChannelBuffer) e.getMessage();
    synchronized (trafficLock) {
        outboundChannel.write(msg);
        if (!outboundChannel.isWritable()) {
            e.getChannel().setReadable(false);
        }
    }
}

2.1. 对后端通道的操作,要做同步

需要注意的是,凡是对 outboundHandler 的操作都是需要同步的,因为 PoechantProxyHandler 中的 outboundHandler 是 volatile,就是异变的,为什么呢?因为它的OP_READ(相关的讨论可以参见这里)可能被改变,所以要同步。所以这里就用到了 java 的 synchronized 同步代码块,在每个 ProchantProxyHandler 的实例身上都有一个trafficLock成员,当做锁来使用,这是一个 Object,如下:

final Object trafficLock = new Object();

2.2. 后端不可写,则前端不要读

当然,说的就是这段:

if (!outboundChannel.isWritable()) {
    e.getChannel().setReadable(false);
}

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

3. 连接可操作状态改变时触发 channelInterestChanged

public void channelInterestChanged(ChannelHandlerContext ctx,
                          ChannelStateEvent e)
                            throws Exception

简单说,就是“This function was invoked when aChannel'sinterestOpswas changed.” 那什么是interestOps呢?

The interestOps value which tells that only read operation has been suspended.

  • org.jboss.netty.channel.Channel.OP_NONE
  • org.jboss.netty.channel.Channel.OP_READ
  • org.jboss.netty.channel.Channel.OP_WRITE
  • org.jboss.netty.channel.Channel.OP_READ_WRITE

他们的取值范围如下:

Modifier and Type Constant Field Value
public static final int OP_NONE 0 = 0000
public static final int OP_READ 1 = 0001
public static final int OP_WRITE 4 = 0100
public static final int OP_READ_WRITE 5 = 0101

前端连接可写时,后端连接就需要可读。

@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
        ChannelStateEvent e) throws Exception {
    synchronized (trafficLock) {
        if (e.getChannel().isWritable()) {
            if (outboundChannel != null) {
                outboundChannel.setReadable(true);
            }
        }
    }
}

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

4. 后端连接的 ChannelPipelineHandler

后端连接有相应的 Handler,在创建后端连接时:

ClientBootstrap cb = new ClientBootstrap(cf);
cb.getPipeline().addLast("handler", new OutboundHandler(e.getChannel()));

所以可如下定义:

private class OutboundHandler extends SimpleChannelUpstreamHandler {

    private final Channel inboundChannel;

    OutboundHandler(Channel inboundChannel) {
        this.inboundChannel = inboundChannel;
    }

    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
            throws Exception {…}
    public void channelInterestChanged(ChannelHandlerContext ctx,
            ChannelStateEvent e) throws Exception {…}
}

其中 messageReceived 和 channelInterestChanged 的实现方式如下:

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {
    ChannelBuffer msg = (ChannelBuffer) e.getMessage();
    synchronized (trafficLock) {
        inboundChannel.write(msg);
        if (!inboundChannel.isWritable()) {
            e.getChannel().setReadable(false);
        }
    }
}
@Override
public void channelInterestChanged(ChannelHandlerContext ctx,
        ChannelStateEvent e) throws Exception {
    synchronized (trafficLock) {
        if (e.getChannel().isWritable()) {
            inboundChannel.setReadable(true);
        }
    }
}

这里可以概括一下。

if and if then
frontendchannelOpen N/A set frontend writable
frontendmessageReceived backend nonwritable set frontend nonreadable
frontendchannelInterestChanged frontend writable set backend readable
backendmessageReceived frontend nonwritable set backend nonreadable
backendchannelInterestChanged backend writable set frontend readable

可以看到:

  • 当一方收到消息时,关心的是另一方可不可写,如果另一方不可写则设置该方不可读(因为收到消息,自己一定是可读的,而对方的读状态不需考虑);
  • 当一方interestOps改变时,关心的是自己是否变成可写,如果自己可写则设置对方可读(因为自己的读状态,只需要到messageReceived时考虑即可,而对于自己变成不可写这种情况,在messageReceived中已经考虑了)。

-

转载请注明来自柳大的CSDN博客:Blog.CSDN.net/Poechant,微博:weibo.com/lauginhom

-

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics