`
haole
  • 浏览: 650 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

基于netty4的beanstalkd的java客户端实现

阅读更多
最近实现了基于netty4的beanstalkd的客户端,  实现此客户端的目的是为了学习netty。
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见
https://wenku.baidu.com/view/b9654077f242336c1eb95e54.html
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装

  协议的封装参考了dinstone,首先定义接口表示协议的功能


package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public interface Command {

    String getCommandLine(); // 拼接协议的命令

    ByteBuf prepareRequest(ByteBuf sendBuf, Charset charset, String delimiter); // 编码命令到ByteBuf中

}


提取公共部分到抽象类中
package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public class AbstractCommand implements Command {

    private String commandLine;


    @Override
    public String getCommandLine() {
        return this.commandLine;
    }

    @Override
    public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
        sendBuf.writeBytes(this.commandLine.getBytes(charset));
        sendBuf.writeBytes(delimiter.getBytes(charset));
        return sendBuf;
    }

    public void setCommandLine(String commandLine) {
        this.commandLine = commandLine;
    }
}

beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
package com.haole.mq.beanstalk.command;

import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

/**
 * put command
 * Created by shengjunzhao on 2017/5/27.
 */
public class PutCommand extends AbstractCommand {

    private final static Logger log = LoggerFactory.getLogger(PutCommand.class);

    private byte[] data;

    public PutCommand(int priority, int delay, int ttr, byte[] data) {
        if (data == null) {
            throw new IllegalArgumentException("data is null");
        }
        if (data.length > 65536) {
            throw new IllegalArgumentException("data is too long than 65536");
        }
        setCommandLine("put " + priority + " " + delay + " " + ttr + " " + data.length);
        this.data = data;
    }

    @Override
    public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) {
        sendBuf = super.prepareRequest(sendBuf,charset,delimiter);
        sendBuf.writeBytes(data);
        sendBuf.writeBytes(delimiter.getBytes(charset));
        return sendBuf;
    }
}


消费类reserve实现
package com.haole.mq.beanstalk.command;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Created by shengjunzhao on 2017/5/29.
 */
public class ReserveCommand extends AbstractCommand {

    private final static Logger log = LoggerFactory.getLogger(ReserveCommand.class);

    public ReserveCommand(long timeout) {
        if (timeout > 0)
            setCommandLine("reserve-with-timeout " + timeout);
        else
            setCommandLine("reserve");
    }
}



2. 编码解码的实现

编码用于把beanstalkd的每一个协议转换为netty的ByteBuf
package com.haole.mq.beanstalk.codec;

import com.haole.mq.beanstalk.command.Command;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;

/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class CommandEncode extends MessageToByteEncoder<Command> {

    private static final Logger log = LoggerFactory.getLogger(CommandEncode.class);

    private Charset charset;
    private String delimiter;

    public CommandEncode(Charset charset) {
        this(charset, "\r\n");
    }

    public CommandEncode(Charset charset, String delimiter) {
        this.charset = charset;
        this.delimiter = delimiter;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception {
        if (null == msg) {
            throw new Exception("The encode message is null");
        }
        ByteBuf sendBuf = ctx.channel().alloc().buffer(512);
        log.debug("&&&&& command={}", msg.getCommandLine());
        sendBuf = msg.prepareRequest(sendBuf, charset, delimiter);
        out.writeBytes(sendBuf);
    }
}



解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个列来表示
package com.haole.mq.beanstalk.command;

/**
 * Created by shengjunzhao on 2017/5/27.
 */
public class Response {
    private String statusLine;
    private byte[] data;

    public String getStatusLine() {
        return statusLine;
    }

    public void setStatusLine(String statusLine) {
        this.statusLine = statusLine;
    }

    public byte[] getData() {
        return data;
    }

    public void setData(byte[] data) {
        this.data = data;
    }
}



解码器
package com.haole.mq.beanstalk.codec;

import com.haole.mq.beanstalk.command.Response;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.Charset;
import java.util.List;

/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class CommandDecode extends ByteToMessageDecoder {

    private final static Logger log = LoggerFactory.getLogger(CommandDecode.class);
    private Charset charset;
    private String delimiter;

    public CommandDecode(Charset charset) {
        this(charset, "\r\n");
    }

    public CommandDecode(Charset charset, String delimiter) {
        this.charset = charset;
        this.delimiter = delimiter;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        in.markReaderIndex();
        int readableBytes = in.readableBytes();
        Response response = new Response();
        byte[] resp = new byte[readableBytes];
        in.readBytes(resp);
        log.debug("bytebuf in {}",resp);
        byte previous = 0;
        boolean isReset = true;
        for (int i = 0; i < readableBytes; i++) {
            byte current = resp[i];
            if (previous == 13 && current == 10) {
                String commandLine = new String(resp, 0, i - 1, charset);
                String[] spilts = commandLine.split(" ");
                String result = spilts[0];
                if ("RESERVED".equals(result) || "FOUND".equals(result) || "OK".equals(result)) {
                    String bytesStr = spilts[spilts.length - 1];
                    if (bytesStr.matches("\\d+")) {
                        int bytes = Integer.valueOf(bytesStr);
                        if (bytes == readableBytes - i - 1 - 2) {
                            byte[] data = new byte[bytes];
                            System.arraycopy(resp, i + 1, data, 0, bytes);
                            response.setData(data);
                            isReset = false;
                        }
                    } else
                        isReset = false;
                } else
                    isReset = false;
                response.setStatusLine(commandLine);
                break;
            }
            previous = current;
        }
        if (isReset)
            in.resetReaderIndex();
        else {
            out.add(response);
        }
    }
}



3. netty 处理器

处理器接收netty解码后的响应保存到阻塞队列中

package com.haole.mq.beanstalk.handler;

import com.haole.mq.beanstalk.command.Command;
import com.haole.mq.beanstalk.command.Response;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.LinkedBlockingQueue;


/**
 * Created by shengjunzhao on 2017/5/28.
 */
public class BeanstalkHandler extends ChannelInboundHandlerAdapter {

    private static final Logger log = LoggerFactory.getLogger(BeanstalkHandler.class);
    private LinkedBlockingQueue<Response> queue = new LinkedBlockingQueue<>();
    private Channel channel;



    public BeanstalkHandler() {}

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.channel=ctx.channel();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Response response = (Response) msg;
        queue.put(response);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }

    public Response sendMessage(Command command) throws InterruptedException {
        this.channel.writeAndFlush(command);
        return queue.take();
    }
}



此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。

4. netty的初始化

private void init() {
        b.group(group)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("beanstalk decode", new CommandDecode(Charset.forName("UTF-8")));
                        pipeline.addLast("beanstalk encode", new CommandEncode(Charset.forName("UTF-8")));
                        pipeline.addLast("beanstalk client handler", new BeanstalkHandler());

                    }
                });

    }



5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例

/**
     * 向队列中插入一个job
     *
     * @param channel
     * @param priority 优先级 0~2**32的整数,最高优先级是0
     * @param delay    是一个整形数,表示将job放入ready队列需要等待的秒数
     * @param ttr      time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。
     *                 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。
     *                 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。
     * @param data     及job体,是一个长度为<byetes> 的字符序列
     * @return 如果大于0,是新job的数字编号,如果小于0,错误,-1;未知;-2:BURIED;-3:EXPECTED_CRLF;-4:JOB_TOO_BIG;-5:DRAINING
     * @throws InterruptedException
     */
    public long put(Channel channel, int priority, int delay, int ttr, byte[] data) throws InterruptedException {
        Command putCommand = new PutCommand(priority, delay, ttr, data);
        Response response = channel.pipeline().get(BeanstalkHandler.class).sendMessage(putCommand);
        log.debug("response status {}", response.getStatusLine());
        String[] spilts = response.getStatusLine().split(" ");
        if ("INSERTED".equals(spilts[0])) {
            return Long.valueOf(spilts[1]).longValue();
        } else if ("BURIED".equals(spilts[0])) {
            return -2;
        } else if ("EXPECTED_CRLF".equals(spilts[0])) {
            return -3;
        } else if ("JOB_TOO_BIG".equals(spilts[0])) {
            return -4;
        } else if ("DRAINING".equals(spilts[0])) {
            return -5;
        } else
            return -1;
    }



6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。

Set<String> servers = new HashSet<>();
        servers.add("192.168.209.132:11300");
        servers.add("192.168.209.133:11300");
        servers.add("192.168.209.134:11300");
        BeanstalkProvider provider1 = new DefaultBeanstalkProvider(servers, "beanstalks1");

BeanstalkConsumer consumer1 = new DefaultBeanstalkConsumer(servers, "beanstalks1");


初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接

provider1.quit();
consumer1.quit();


具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j
文中不足之处, 请各位大侠指正。
分享到:
评论
1 楼 smart152829 2017-07-01  

很不错的文章,最近看了看了夜行侠老师讲的Netty深入浅出源码剖析,里面用到了长连接高并发,在测试的时候,性能真的比之前的tomcat那些容器好几十倍

相关推荐

    基于Netty实现的MQTT客户端_netty-mqtt-client.zip

    基于Netty实现的MQTT客户端_netty-mqtt-client

    基于netty实现的mqtt客户端,可用于Java、Android环境.zip

    《基于Netty实现的MQTT客户端在Java与Android环境中的应用》 MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)领域,尤其是资源受限的设备之间。在本文中...

    java应用netty服务端和客户端

    Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...

    基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip

    基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏...

    Netty实现Java服务端和C#客户端联通

    标题中的“Netty实现Java服务端和C#客户端联通”是指使用Netty作为Java服务器框架,与C#客户端(使用DotNetty库)进行通信的一种技术实现。这涉及到跨平台的网络通信,以及两个不同编程语言间的交互。 Netty是Java...

    基于java netty的udp客户端声呐数据对接

    在"基于Java Netty的UDP客户端声呐数据对接"项目中,我们主要关注如何利用Netty处理UDP通信,以及如何解析和封装SCANFISH-II型声呐系统的数据。 UDP(User Datagram Protocol)是一种无连接的传输层协议,它不保证...

    JAVA版基于netty的物联网高并发智能网关.zip

    JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...

    基于netty实现的mqtt客户端,可用于Java、Android环境_netty-mqtt-client.zip

    基于netty实现的mqtt客户端,可用于Java、Android环境_netty-mqtt-client

    基于Java的netty-mqtt MQTT 3.1.1协议服务端与客户端设计源码

    netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...

    Netty中实现多客户端连接与通信-以实现聊天室群聊功能为例示例代码.rar

    在本示例中,"HelloNetty"可能是包含启动服务器和客户端的Java源代码文件,其中包含了上述步骤的具体实现。通过阅读和理解这些代码,你可以更好地了解Netty如何处理多客户端连接和通信,以及如何构建一个简单的群聊...

    基于Netty的Java数据采集软件

    本文将深入探讨一个基于Netty的Java数据采集软件,它利用Netty强大的网络通信框架,实现了对大规模分布式数据节点的高效采集。 Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于服务器开发,特别是...

    Netty4编写服务器客户端,自定义编解码,发送自定义消息

    在这个项目中,我们将深入理解如何利用 Netty 4 来编写服务器和客户端,实现自定义的消息编解码,并进行通信。 首先,我们要创建一个自定义的消息类。这个消息类通常会包含必要的字段,比如消息头、消息体等,以...

    C# Netty 客户端,服务器端包含接双向接收

    标题中的"C# Netty 客户端,服务器端包含双向接收"揭示了这是一个关于使用C#语言实现基于Netty框架的客户端和服务器端通信的项目。Netty是Java平台上的一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可...

    java实现基于netty 的udp字节数据接收服务

    本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行快速、无连接的数据传输场景中非常常见。以下是对这个主题的详细讲解: 首先,我们需要理解UDP的基础知识。UDP...

    基于netty编写的socket服务端

    综上所述,基于Netty编写的socket服务端利用了Java的NIO特性,通过事件驱动和处理器链的方式,实现了高效的网络通信。开发者可以根据需求定制自己的业务处理器,处理各种复杂的网络协议,实现高性能的网络服务。在...

    Netty实现简单的客户端服务端通信示例

    Netty 的设计基于 NIO(非阻塞 I/O),它利用了 Java 提供的 Channel、EventLoop 和 Buffer 等组件,提高了网络应用的性能和可扩展性。在 Netty 中,服务器称为 BossGroup,客户端连接称为 WorkerGroup,它们分别...

    基于 Java Netty实现的可用于内网穿透的代理工具.zip

    基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于...

    基于netty的服务器客户端收发消息代码

    在这个“基于Netty的服务器客户端收发消息代码”中,我们可以看到一个简单的实现,它演示了如何使用Netty进行双向通信,即服务器与客户端之间的消息交换。 首先,我们从服务器端(ChatServer)入手。服务器端通常...

    基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK。.zip

    【标题】中的“基于 Netty 开发的 Java 游戏服务端框架”指的是一个使用了 Netty 框架构建的、专为游戏服务端设计的高性能、高并发的 Java 应用框架。Netty 是一个异步事件驱动的网络应用框架,常用于创建高效的 TCP...

    基于netty实现的web框架

    【标题】:“基于Netty实现的Web框架” 在IT领域,构建高性能、高并发的网络应用是关键。Netty作为一个强大的异步事件驱动的网络应用框架,为Java开发者提供了高效且灵活的基础,使得构建Web框架变得更为便捷。本...

Global site tag (gtag.js) - Google Analytics