`
haole
  • 浏览: 657 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于netty4的beanstalkd的java客户端实现

阅读更多
最近实现了基于netty4的beanstalkd的客户端,  实现此客户端的目的是为了学习netty。
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见
https://wenku.baidu.com/view/b9654077f242336c1eb95e54.html
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装

  协议的封装参考了dinstone,首先定义接口表示协议的功能


package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public interface Command {

    String getCommandLine(); // 拼接协议的命令

    ByteBuf prepareRequest(ByteBuf sendBuf, Charset charset, String delimiter); // 编码命令到ByteBuf中

}


提取公共部分到抽象类中
package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public class AbstractCommand implements Command {

    private String commandLine;


    @Override
    public String getCommandLine() {
        return this.commandLine;
    }

    @Override
    public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
        sendBuf.writeBytes(this.commandLine.getBytes(charset));
        sendBuf.writeBytes(delimiter.getBytes(charset));
        return sendBuf;
    }

    public void setCommandLine(String commandLine) {
        this.commandLine = commandLine;
    }
}

beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

/**
 * put command
 * Created by shengjunzhao on 2017/5/27.
 */
public class PutCommand extends AbstractCommand {

    private final static Logger log = LoggerFactory.getLogger(PutCommand.class);

    private byte[] data;

    public PutCommand(int priority, int delay, int ttr, byte[] data) {
        if (data == null) {
            throw new IllegalArgumentException("data is null");
        }
        if (data.length > 65536) {
            throw new IllegalArgumentException("data is too long than 65536");
        }
        setCommandLine("put " + priority + " " + delay + " " + ttr + " " + data.length);
        this.data = data;
    }

    @Override
    public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
        sendBuf = super.prepareRequest(sendBuf,charset,delimiter);
        sendBuf.writeBytes(data);
        sendBuf.writeBytes(delimiter.getBytes(charset));
        return sendBuf;
    }
}


消费类reserve实现
package com.haole.mq.beanstalk.command;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by shengjunzhao on 2017/5/29.
 */
public class ReserveCommand extends AbstractCommand {

    private final static Logger log = LoggerFactory.getLogger(ReserveCommand.class);

    public ReserveCommand(long timeout) {
        if (timeout > 0)
            setCommandLine("reserve-with-timeout " + timeout);
        else
            setCommandLine("reserve");
    }
}



2. 编码解码的实现

编码用于把beanstalkd的每一个协议转换为netty的ByteBuf
package com.haole.mq.beanstalk.codec;

import com.haole.mq.beanstalk.command.Command;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class CommandEncode extends MessageToByteEncoder<Command> {

    private static final Logger log = LoggerFactory.getLogger(CommandEncode.class);

    private Charset charset;
    private String delimiter;

    public CommandEncode(Charset charset) {
        this(charset, "\r\n");
    }

    public CommandEncode(Charset charset, String delimiter) {
        this.charset = charset;
        this.delimiter = delimiter;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
        if (null == msg) {
            throw new Exception("The encode message is null");
        }
        ByteBuf sendBuf = ctx.channel().alloc().buffer(512);
        log.debug("&&&&& command={}", msg.getCommandLine());
        sendBuf = msg.prepareRequest(sendBuf, charset, delimiter);
        out.writeBytes(sendBuf);
    }
}



解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个列来表示
package com.haole.mq.beanstalk.command;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public class Response {
    private String statusLine;
    private byte[] data;

    public String getStatusLine() {
        return statusLine;
    }

    public void setStatusLine(String statusLine) {
        this.statusLine = statusLine;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
}



解码器
package com.haole.mq.beanstalk.codec;

import com.haole.mq.beanstalk.command.Response;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class CommandDecode extends ByteToMessageDecoder {

    private final static Logger log = LoggerFactory.getLogger(CommandDecode.class);
    private Charset charset;
    private String delimiter;

    public CommandDecode(Charset charset) {
        this(charset, "\r\n");
    }

    public CommandDecode(Charset charset, String delimiter) {
        this.charset = charset;
        this.delimiter = delimiter;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        int readableBytes = in.readableBytes();
        Response response = new Response();
        byte[] resp = new byte[readableBytes];
        in.readBytes(resp);
        log.debug("bytebuf in {}",resp);
        byte previous = 0;
        boolean isReset = true;
        for (int i = 0; i < readableBytes; i++) {
            byte current = resp[i];
            if (previous == 13 && current == 10) {
                String commandLine = new String(resp, 0, i - 1, charset);
                String[] spilts = commandLine.split(" ");
                String result = spilts[0];
                if ("RESERVED".equals(result) || "FOUND".equals(result) || "OK".equals(result)) {
                    String bytesStr = spilts[spilts.length - 1];
                    if (bytesStr.matches("\\d+")) {
                        int bytes = Integer.valueOf(bytesStr);
                        if (bytes == readableBytes - i - 1 - 2) {
                            byte[] data = new byte[bytes];
                            System.arraycopy(resp, i + 1, data, 0, bytes);
                            response.setData(data);
                            isReset = false;
                        }
                    } else
                        isReset = false;
                } else
                    isReset = false;
                response.setStatusLine(commandLine);
                break;
            }
            previous = current;
        }
        if (isReset)
            in.resetReaderIndex();
        else {
            out.add(response);
        }
    }
}



3. netty 处理器

处理器接收netty解码后的响应保存到阻塞队列中

package com.haole.mq.beanstalk.handler;

import com.haole.mq.beanstalk.command.Command;
import com.haole.mq.beanstalk.command.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.LinkedBlockingQueue;


/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class BeanstalkHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(BeanstalkHandler.class);
    private LinkedBlockingQueue<Response> queue = new LinkedBlockingQueue<>();
    private Channel channel;



    public BeanstalkHandler() {}

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channel=ctx.channel();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Response response = (Response) msg;
        queue.put(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    public Response sendMessage(Command command) throws InterruptedException {
        this.channel.writeAndFlush(command);
        return queue.take();
    }
}



此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。

4. netty的初始化

private void init() {
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("beanstalk decode", new CommandDecode(Charset.forName("UTF-8")));
                        pipeline.addLast("beanstalk encode", new CommandEncode(Charset.forName("UTF-8")));
                        pipeline.addLast("beanstalk client handler", new BeanstalkHandler());

                    }
                });

    }



5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例

/**
     * 向队列中插入一个job
     *
     * @param channel
     * @param priority 优先级 0~2**32的整数,最高优先级是0
     * @param delay    是一个整形数,表示将job放入ready队列需要等待的秒数
     * @param ttr      time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。
     *                 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。
     *                 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。
     * @param data     及job体,是一个长度为<byetes> 的字符序列
     * @return 如果大于0,是新job的数字编号,如果小于0,错误,-1;未知;-2:BURIED;-3:EXPECTED_CRLF;-4:JOB_TOO_BIG;-5:DRAINING
     * @throws InterruptedException
     */
    public long put(Channel channel, int priority, int delay, int ttr, byte[] data) throws InterruptedException {
        Command putCommand = new PutCommand(priority, delay, ttr, data);
        Response response = channel.pipeline().get(BeanstalkHandler.class).sendMessage(putCommand);
        log.debug("response status {}", response.getStatusLine());
        String[] spilts = response.getStatusLine().split(" ");
        if ("INSERTED".equals(spilts[0])) {
            return Long.valueOf(spilts[1]).longValue();
        } else if ("BURIED".equals(spilts[0])) {
            return -2;
        } else if ("EXPECTED_CRLF".equals(spilts[0])) {
            return -3;
        } else if ("JOB_TOO_BIG".equals(spilts[0])) {
            return -4;
        } else if ("DRAINING".equals(spilts[0])) {
            return -5;
        } else
            return -1;
    }



6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。

Set<String> servers = new HashSet<>();
        servers.add("192.168.209.132:11300");
        servers.add("192.168.209.133:11300");
        servers.add("192.168.209.134:11300");
        BeanstalkProvider provider1 = new DefaultBeanstalkProvider(servers, "beanstalks1");

BeanstalkConsumer consumer1 = new DefaultBeanstalkConsumer(servers, "beanstalks1");


初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接

provider1.quit();
consumer1.quit();


具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j
文中不足之处, 请各位大侠指正。
分享到:
评论
1 楼 smart152829 2017-07-01  

很不错的文章,最近看了看了夜行侠老师讲的Netty深入浅出源码剖析,里面用到了长连接高并发,在测试的时候,性能真的比之前的tomcat那些容器好几十倍

相关推荐

Global site tag (gtag.js) - Google Analytics