- 浏览: 653 次
- 性别:
- 来自: 北京
最近访客 更多访客>>
最新评论
-
smart152829:
很不错的文章,最近看了看了夜行侠老师讲的Netty深入浅出源码 ...
基于netty4的beanstalkd的java客户端实现
最近实现了基于netty4的beanstalkd的客户端, 实现此客户端的目的是为了学习netty。
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见
https://wenku.baidu.com/view/b9654077f242336c1eb95e54.html 。
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装
协议的封装参考了dinstone,首先定义接口表示协议的功能
提取公共部分到抽象类中
beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
消费类reserve实现
2. 编码解码的实现
编码用于把beanstalkd的每一个协议转换为netty的ByteBuf
解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个列来表示
解码器
3. netty 处理器
处理器接收netty解码后的响应保存到阻塞队列中
此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。
4. netty的初始化
5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例
6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。
初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接
具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j
文中不足之处, 请各位大侠指正。
很不错的文章,最近看了看了夜行侠老师讲的Netty深入浅出源码剖析,里面用到了长连接高并发,在测试的时候,性能真的比之前的tomcat那些容器好几十倍
beanstalkd是一个高性能、轻量级的分布式内存队列系统,个人认为,如果需要一个轻量型的 中间件, beanstalkd是很不错的一个选择,协议也很简单。beanstalkd的详细介绍 可见
https://wenku.baidu.com/view/b9654077f242336c1eb95e54.html 。
针对消息中间件的概念,分为消息的提供者和消息的消费者,结合beanstalkd和netty的概念,beanstalkd中tube与netty中的channel一一对应。为了在同一个项目中消息的提供者与消息的消费者不相互影响,相同名称的tube,提供者和消费者使用不同的channel。
1. beanstalk协议的封装
协议的封装参考了dinstone,首先定义接口表示协议的功能
package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/27. */ public interface Command { String getCommandLine(); // 拼接协议的命令 ByteBuf prepareRequest(ByteBuf sendBuf, Charset charset, String delimiter); // 编码命令到ByteBuf中 }
提取公共部分到抽象类中
package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/27. */ public class AbstractCommand implements Command { private String commandLine; @Override public String getCommandLine() { return this.commandLine; } @Override public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) { sendBuf.writeBytes(this.commandLine.getBytes(charset)); sendBuf.writeBytes(delimiter.getBytes(charset)); return sendBuf; } public void setCommandLine(String commandLine) { this.commandLine = commandLine; } }
beanstalkd每一个协议对于一个具体的类,例如:put命令实现类
package com.haole.mq.beanstalk.command; import io.netty.buffer.ByteBuf; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; /** * put command * Created by shengjunzhao on 2017/5/27. */ public class PutCommand extends AbstractCommand { private final static Logger log = LoggerFactory.getLogger(PutCommand.class); private byte[] data; public PutCommand(int priority, int delay, int ttr, byte[] data) { if (data == null) { throw new IllegalArgumentException("data is null"); } if (data.length > 65536) { throw new IllegalArgumentException("data is too long than 65536"); } setCommandLine("put " + priority + " " + delay + " " + ttr + " " + data.length); this.data = data; } @Override public ByteBuf prepareRequest(ByteBuf sendBuf,Charset charset, String delimiter) { sendBuf = super.prepareRequest(sendBuf,charset,delimiter); sendBuf.writeBytes(data); sendBuf.writeBytes(delimiter.getBytes(charset)); return sendBuf; } }
消费类reserve实现
package com.haole.mq.beanstalk.command; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Created by shengjunzhao on 2017/5/29. */ public class ReserveCommand extends AbstractCommand { private final static Logger log = LoggerFactory.getLogger(ReserveCommand.class); public ReserveCommand(long timeout) { if (timeout > 0) setCommandLine("reserve-with-timeout " + timeout); else setCommandLine("reserve"); } }
2. 编码解码的实现
编码用于把beanstalkd的每一个协议转换为netty的ByteBuf
package com.haole.mq.beanstalk.codec; import com.haole.mq.beanstalk.command.Command; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; /** * Created by shengjunzhao on 2017/5/28. */ public class CommandEncode extends MessageToByteEncoder<Command> { private static final Logger log = LoggerFactory.getLogger(CommandEncode.class); private Charset charset; private String delimiter; public CommandEncode(Charset charset) { this(charset, "\r\n"); } public CommandEncode(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override protected void encode(ChannelHandlerContext ctx, Command msg, ByteBuf out) throws Exception { if (null == msg) { throw new Exception("The encode message is null"); } ByteBuf sendBuf = ctx.channel().alloc().buffer(512); log.debug("&&&&& command={}", msg.getCommandLine()); sendBuf = msg.prepareRequest(sendBuf, charset, delimiter); out.writeBytes(sendBuf); } }
解码用于把netty的ByteBuf转换为beanstalkd协议的响应。beanstalkd协议的响应结构相同,定义一个列来表示
package com.haole.mq.beanstalk.command; /** * Created by shengjunzhao on 2017/5/27. */ public class Response { private String statusLine; private byte[] data; public String getStatusLine() { return statusLine; } public void setStatusLine(String statusLine) { this.statusLine = statusLine; } public byte[] getData() { return data; } public void setData(byte[] data) { this.data = data; } }
解码器
package com.haole.mq.beanstalk.codec; import com.haole.mq.beanstalk.command.Response; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.nio.charset.Charset; import java.util.List; /** * Created by shengjunzhao on 2017/5/28. */ public class CommandDecode extends ByteToMessageDecoder { private final static Logger log = LoggerFactory.getLogger(CommandDecode.class); private Charset charset; private String delimiter; public CommandDecode(Charset charset) { this(charset, "\r\n"); } public CommandDecode(Charset charset, String delimiter) { this.charset = charset; this.delimiter = delimiter; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { in.markReaderIndex(); int readableBytes = in.readableBytes(); Response response = new Response(); byte[] resp = new byte[readableBytes]; in.readBytes(resp); log.debug("bytebuf in {}",resp); byte previous = 0; boolean isReset = true; for (int i = 0; i < readableBytes; i++) { byte current = resp[i]; if (previous == 13 && current == 10) { String commandLine = new String(resp, 0, i - 1, charset); String[] spilts = commandLine.split(" "); String result = spilts[0]; if ("RESERVED".equals(result) || "FOUND".equals(result) || "OK".equals(result)) { String bytesStr = spilts[spilts.length - 1]; if (bytesStr.matches("\\d+")) { int bytes = Integer.valueOf(bytesStr); if (bytes == readableBytes - i - 1 - 2) { byte[] data = new byte[bytes]; System.arraycopy(resp, i + 1, data, 0, bytes); response.setData(data); isReset = false; } } else isReset = false; } else isReset = false; response.setStatusLine(commandLine); break; } previous = current; } if (isReset) in.resetReaderIndex(); else { out.add(response); } } }
3. netty 处理器
处理器接收netty解码后的响应保存到阻塞队列中
package com.haole.mq.beanstalk.handler; import com.haole.mq.beanstalk.command.Command; import com.haole.mq.beanstalk.command.Response; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; /** * Created by shengjunzhao on 2017/5/28. */ public class BeanstalkHandler extends ChannelInboundHandlerAdapter { private static final Logger log = LoggerFactory.getLogger(BeanstalkHandler.class); private LinkedBlockingQueue<Response> queue = new LinkedBlockingQueue<>(); private Channel channel; public BeanstalkHandler() {} @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); this.channel=ctx.channel(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Response response = (Response) msg; queue.put(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } public Response sendMessage(Command command) throws InterruptedException { this.channel.writeAndFlush(command); return queue.take(); } }
此处使用了阻塞队列保存发送命令的响应。beanstalkd保证了按照顺序接收命令,并按照顺序处理命令。不会出现命令的响应与命令不一致的情况。 但是,netty无法保证请求和响应之间一一对应的关系,依赖于服务器的实现。如果服务器的实现是第三方的,例如像beanstalkd,则能保证一一对应;但是如果服务器对连接到自己的客户端channel的请求使用多线程来实现,则无法保证按照顺序实现一一对应,可能需要定义特殊的消息协议来保证。
4. netty的初始化
private void init() { b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("beanstalk decode", new CommandDecode(Charset.forName("UTF-8"))); pipeline.addLast("beanstalk encode", new CommandEncode(Charset.forName("UTF-8"))); pipeline.addLast("beanstalk client handler", new BeanstalkHandler()); } }); }
5. beanstalkd 命令发送
beanstalkd的命令有10多个,以put为例
/** * 向队列中插入一个job * * @param channel * @param priority 优先级 0~2**32的整数,最高优先级是0 * @param delay 是一个整形数,表示将job放入ready队列需要等待的秒数 * @param ttr time to run—是一个整形数,表示允许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。 * 如果该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。 * 最小ttr为1。如果客户端设置了0,服务端会默认将其增加到1。 * @param data 及job体,是一个长度为<byetes> 的字符序列 * @return 如果大于0,是新job的数字编号,如果小于0,错误,-1;未知;-2:BURIED;-3:EXPECTED_CRLF;-4:JOB_TOO_BIG;-5:DRAINING * @throws InterruptedException */ public long put(Channel channel, int priority, int delay, int ttr, byte[] data) throws InterruptedException { Command putCommand = new PutCommand(priority, delay, ttr, data); Response response = channel.pipeline().get(BeanstalkHandler.class).sendMessage(putCommand); log.debug("response status {}", response.getStatusLine()); String[] spilts = response.getStatusLine().split(" "); if ("INSERTED".equals(spilts[0])) { return Long.valueOf(spilts[1]).longValue(); } else if ("BURIED".equals(spilts[0])) { return -2; } else if ("EXPECTED_CRLF".equals(spilts[0])) { return -3; } else if ("JOB_TOO_BIG".equals(spilts[0])) { return -4; } else if ("DRAINING".equals(spilts[0])) { return -5; } else return -1; }
6. 消息提供者和消费者的初始化和退出
在一个项目中可以单独使用提供者或者消费者,也可以同时使用。此客户端支持简单的beanstalkd集群,同时部署多个beanstalkd服务器,初始化时指定多个服务器的地址,连接是按照tube选择具体的服务器。
Set<String> servers = new HashSet<>(); servers.add("192.168.209.132:11300"); servers.add("192.168.209.133:11300"); servers.add("192.168.209.134:11300"); BeanstalkProvider provider1 = new DefaultBeanstalkProvider(servers, "beanstalks1"); BeanstalkConsumer consumer1 = new DefaultBeanstalkConsumer(servers, "beanstalks1");
初始化后,就可以使用provider或者consumer执行beanstalkd命令。
最后需要调用quit(),退出到服务器的连接
provider1.quit(); consumer1.quit();
具体实现已经上传到GitHub中,https://github.com/shengjunzhao/beanstalk4j
文中不足之处, 请各位大侠指正。
评论
1 楼
smart152829
2017-07-01
很不错的文章,最近看了看了夜行侠老师讲的Netty深入浅出源码剖析,里面用到了长连接高并发,在测试的时候,性能真的比之前的tomcat那些容器好几十倍
相关推荐
基于Netty实现的MQTT客户端_netty-mqtt-client
《基于Netty实现的MQTT客户端在Java与Android环境中的应用》 MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息协议,广泛应用于物联网(IoT)领域,尤其是资源受限的设备之间。在本文中...
Java应用程序中的Netty框架是一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty广泛应用于分布式系统、云计算、大数据处理等领域,它的核心特性包括非阻塞I/O、...
基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏服务端框架,目前提供 CocosCreator 和 Unity 的客户端SDK.zip 基于 Netty 开发的 Java 游戏...
标题中的“Netty实现Java服务端和C#客户端联通”是指使用Netty作为Java服务器框架,与C#客户端(使用DotNetty库)进行通信的一种技术实现。这涉及到跨平台的网络通信,以及两个不同编程语言间的交互。 Netty是Java...
在"基于Java Netty的UDP客户端声呐数据对接"项目中,我们主要关注如何利用Netty处理UDP通信,以及如何解析和封装SCANFISH-II型声呐系统的数据。 UDP(User Datagram Protocol)是一种无连接的传输层协议,它不保证...
JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...
netty-mqtt是一个基于Java开发的MQTT 3.1.1协议服务端与客户端,包含113个文件,其中包括87个Java源文件、8个XML文件、7个Iml文件、3个YAML文件、3个JKS文件、2个Factories文件、1个LICENSE文件和1个Markdown文件。...
基于netty实现的mqtt客户端,可用于Java、Android环境_netty-mqtt-client
在本示例中,"HelloNetty"可能是包含启动服务器和客户端的Java源代码文件,其中包含了上述步骤的具体实现。通过阅读和理解这些代码,你可以更好地了解Netty如何处理多客户端连接和通信,以及如何构建一个简单的群聊...
基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于...
本文将深入探讨一个基于Netty的Java数据采集软件,它利用Netty强大的网络通信框架,实现了对大规模分布式数据节点的高效采集。 Netty是一个高性能、异步事件驱动的网络应用程序框架,广泛应用于服务器开发,特别是...
在这个项目中,我们将深入理解如何利用 Netty 4 来编写服务器和客户端,实现自定义的消息编解码,并进行通信。 首先,我们要创建一个自定义的消息类。这个消息类通常会包含必要的字段,比如消息头、消息体等,以...
本示例关注的是如何利用Netty实现一个基于UDP(User Datagram Protocol)的数据接收服务,这在需要进行快速、无连接的数据传输场景中非常常见。以下是对这个主题的详细讲解: 首先,我们需要理解UDP的基础知识。UDP...
标题中的"C# Netty 客户端,服务器端包含双向接收"揭示了这是一个关于使用C#语言实现基于Netty框架的客户端和服务器端通信的项目。Netty是Java平台上的一个高性能、异步事件驱动的网络应用程序框架,用于快速开发可...
综上所述,基于Netty编写的socket服务端利用了Java的NIO特性,通过事件驱动和处理器链的方式,实现了高效的网络通信。开发者可以根据需求定制自己的业务处理器,处理各种复杂的网络协议,实现高性能的网络服务。在...
Netty 的设计基于 NIO(非阻塞 I/O),它利用了 Java 提供的 Channel、EventLoop 和 Buffer 等组件,提高了网络应用的性能和可扩展性。在 Netty 中,服务器称为 BossGroup,客户端连接称为 WorkerGroup,它们分别...
本话题将探讨如何使用C++客户端与Java(通过Netty框架)服务器端实现TCP通讯,并涉及数据序列化工具Protocol Buffers(protobuf)在两者之间的交互。 首先,TCP(传输控制协议)是一种面向连接的、可靠的、基于字节...
在这个“基于Netty的服务器客户端收发消息代码”中,我们可以看到一个简单的实现,它演示了如何使用Netty进行双向通信,即服务器与客户端之间的消息交换。 首先,我们从服务器端(ChatServer)入手。服务器端通常...