`
QING____
  • 浏览: 2250695 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

Redis编程实践【protocol】

 
阅读更多

    Redis Protocol即为client与server交互时,所使用的数据格式;符合格式的数据能够被server端解析并返回结果,client端如果按照格式要求既可以解析“结果”并将结构化数据反馈给调用者。有些时候,我们可以通过改造协议的方式构建redis-client套层或者server端的Proxy。

    Redis-Client与Server之间进行的任何通讯,均是通过普通的TCP链接进行,因为TCP通讯是面向字节流的,因此它和其他任何基于字节流的信息交互的平台一样,需要“protocol”(协议);任何protocol本身需要至少具备2种能力:1)字节码成帧策略 2)字符序列格式约束。

    “字节码成帧”被广泛的应用在基于字节流的网络交互中,以TCP通讯为例,在Socket通讯中所发送的数据(特别是长链接,持续交付packet的场景下),对于socket的一端都需要知道每个packet的“终止符”位置,以及packet中每个数据field的偏移量;只有这样,对于socket的数据的接收端才能将“无边界流数据”有效的转化成结构化/可读的字符序列。如下展示一个packet帧的结构,其中"[""]"只是为了标记参数:

    [magic-header][packet-bytes-length][field-name-bytes-length][field-name-bytes][field-data-bytes-length][field-data-bytes]

    实际数据大概为:

    [“magic-header”][10][4]["name".getBytes()][6]["012345".getBytes()]

    不同的设计者可能考虑成帧的方式不同,但是都需要描述当前packet字节的长度/filed顺序/每个filed的字节长度等。上述例子则表达:此packet需要以“magic-header”开头(主要用来防止字节流被意外破坏或者乱序,同时用来表示一个新帧的开始),此后的总字节长度为10,其中“name”这个filed名称占4个字节,“name”这个filed对应的数据占6个字节;最终我们还原成name=012345这么一个信息。

    “字符序列格式约束”即将字符串按照一定的规则进行解析,并获得有效数据;其中“规则”就是格式约束,对于符合规则的字符串才能被接受和实施,否则将会被丢弃。比如xml,json等等,只有符合格式约束的字符串才能被相应的引擎所解析。

    Redis协议非常简单且容易理解,request和reply数据都遵循同一个协议;对于client端的每个request,最终会被描述为一个command,command所包含的信息只会包括:指令名称 + 参数列表。那么对于server端的response,最终会被描述为一个result,那么result可能包括此次操作的状态(status)码、错误信息、结果内容等。协议就是为双端交互中的数据格式提供约束。

    1. Request部分:对于指令操作[command-name][arg...]最终将会转换成如下格式在网络中传输并交付给server端:

 

*[参数数量] \r\n
$[参数字节个数]\r\n
[参数字节序列]
...
##以“SET key value”指令为例:
*3	#表示有3个参数
$3	#表示“参数”有三个字节("SET"字符串为3个字节)
SET
$4
name
$5
01234


##流的方式
*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$5\r\n01234\r\n
   

 

    2.Reply部分:server端需要返回结果的类型,redis中,reply的首个字符用来表示结果的类型,最终以“\r\n”结束.

  • “+”:“状态”类型reply,对于无需实际数据返回的相应,只是用来表示此次操作成功与否,例如SET KEY-VALUE指令。比如“+OK”表示操作成功,如果不成功将会返回“-ERROR”。
  • “-”:“异常”信息类型reply,对于操作失败时,将会返回此类型的信息告知client端。
    lrange testtt w x
    -ERR value is not an integer or out of range
     在“-”之后为“错误类型”,此后为一个空格或者“\r\n”,然后为异常信息内容,内容可能有多行。
  • “:”:表示返回结果为integer类型,此结果只包括一个数字,也可以用来表示true/false的结果类型,比如INCR/DECR/EXISTS/SISMEMBER等指令;需要注意的是,integer结果也是按照“字符串”方式传输的,你不能按照“4个字节=integer”的思路去使用它。比如integer = 23456,那么“23456”实际上是5个字节
    exists fck-me
    :0
    exists fck-you
    :1
  • “$”:表示返回的结果为普通数据结果,格式为:[$][字节个数][\r\n][字节序列][\r\n],如果结果中存在$-1则表示当前请求的数据为“null”。
    get fck-you
    $3
    123
    
  • “*”:表示返回的结果为复合数据结果,结果中包括多个子数据集合。
    lrange testlist 0 -1
    *2
    $1
    b
    $1
    a
    
     
    Redis支持“行内指令”,我们可以通过telnet的方式,执行指令和查看协议:
qing@qing-tp:~$ telnet 127.0.0.1 6379
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
SET test 1   
+OK
incr test
:2
exist test
-ERR unknown command 'exist'
 
     3.协议与实践:在下文的代码中,我们将通过代码展示如何直接通过Socket-IO的方式直接与redis-server交互,可以比较直观的明白protocol数据格式;其实,可以认为它是一个java redis-client,不过我本人采取了和Jedis不同的请求处理手段,本人采取了“请求队列 + 同步”的方式进行,其实如下代码可以在极少的修改下,改变为“异步”的方式。在此需要提醒:protocol中的字节流需要UTF-8编码之后。
TestMain.java
public class TestMain {

	public static void main(String[] args){
		Client client = new Client("127.0.0.1", 6379);
		client.set("testset", "012xyz中国_?");
		System.out.println(client.get("testset"));
		List<String> list = client.lrange("testlist", 0, -1);
		for(String item : list){
			System.out.println("--:" + item);
		}
		System.out.println("incr:" + client.incr("testincr"));
	}
}
 
Client.java(完整代码,参见附件)
public class Client {
	
	private BlockingQueue<Request> requests = new LinkedBlockingQueue<Request>();
	private Charset charset = Charset.forName("utf-8");
	private Handler handler;
	Client(String host,int port){
		handler = new Handler(host, port);
		handler.setDaemon(true);
		handler.start();
	}
	
	public void set(String key,String value){
		Request request = new Request(Command.SET, key,value);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
		}catch(InterruptedException e){
			return;
		}
	}	

	
	public String get(String key){
		Request request = new Request(Command.GET, key);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return reply.result;
			
		}catch(InterruptedException e){
			return null;
		}
	}
	
	public List<String> lrange(String key,int from,int to){
		Request request = new Request(Command.LRANGE, key,String.valueOf(from),String.valueOf(to));
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return reply.lresult;
			
		}catch(InterruptedException e){
			return null;
		}
	}
	
	public Integer incr(String key){
		Request request = new Request(Command.INCR,key);
		try{
			synchronized (request) {
				requests.put(request);
				request.wait();
			}
			Reply reply = request.reply;
			if(reply == null || reply.code == -1 || !reply.success){
				throw new RuntimeException("operation fail..");
			}
			return Integer.valueOf(reply.result);
			
		}catch(InterruptedException e){
			return null;
		}
	}

	public void close(){
		handler.close();
	}
	
	class Handler extends Thread{
		Socket socket = null;
		boolean closed = false;
		BufferedReader is = null;
		OutputStream os = null;
		String host;
		int port;
		Handler(String host,int port){
			try{
				this.host = host;
				this.port = port;
				connect();
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		
		private void connect() throws IOException{
			socket = new Socket();
			SocketAddress addr = new InetSocketAddress(host,port);
			socket.setKeepAlive(true);
			//socket.setSoTimeout(10000);
			socket.setSoLinger(true,0);
			//socket.setReceiveBufferSize(1024);
			socket.setTcpNoDelay(true);
			socket.connect(addr,10000); //blocking
			is = new BufferedReader(new InputStreamReader(socket.getInputStream(),charset));
			os = socket.getOutputStream();
		}
		
		public void close(){
			closed = true;
			this.interrupt();
		}
		
		
		private void write(Request request) throws IOException{
			os.write('*');
			String[] args = request.args;
			os.write(String.valueOf(args.length + 1).getBytes(charset));
			os.write('\r');
			os.write('\n');
			//*2
			os.write('$');
			byte[] cb = request.command.name().getBytes(charset);
			os.write(String.valueOf(cb.length).getBytes(charset));
			os.write('\r');
			os.write('\n');
			//$3
			os.write(cb);
			os.write('\r');
			os.write('\n');
			//GET
			for(String arg : args){
				byte[] ab = arg.getBytes(charset);
				os.write('$');
				os.write(String.valueOf(ab.length).getBytes(charset));
				os.write('\r');
				os.write('\n');
				os.write(ab);
				os.write('\r');
				os.write('\n');
			}
		}
		
		@Override
		public void run(){
			try{
				while(!closed){
					Request request = requests.take();
					try{
						write(request);
						char status = (char)is.read();
						Reply reply = new Reply();
						if(status != '-'){
							reply.success = true;
						}
						if(status == '+' || status == '-'){
							reply.message = read();
						}else if(status == '$'){
							reply.result = readString();
						}else if(status == '*'){
							reply.lresult = readMulti();
						}else if(status == ':'){
							reply.result = read();
						}else{
							request.reply = new Reply(-1);
							throw new RuntimeException("packet error..");
						}
						synchronized (request) {
							request.reply = reply;
							request.notifyAll();
						}
					}catch(Exception e){
						try{
							socket.close();
							this.connect();
						}catch(Exception ex){
							//
						}
						synchronized (request) {
							request.notifyAll();
						}
					}
				}
			}catch(InterruptedException e){
				try{
					closed = true;
					socket.close();
					for(Request request : requests){
						request.blocker.interrupt();
					}
					requests.clear();
				}catch(Exception ex){
					//
				}
			}
		}
		//read line
		private String read() throws IOException{
			StringBuilder sb = new StringBuilder();
			//\r\n必须互为成对
			//不能直接使用is.readline()
			boolean lfcr = false;
			while(true){
				char _char = (char)is.read();
				if(_char == -1){
					close();
					break;
				}
				//如果上一个字符为\r
				if(lfcr == true){
					if(_char == '\n'){
						break;
					}
					sb.append('\r');
					lfcr = false;
				}
				if(_char == '\r'){
					lfcr = true;
					continue;
				}
				sb.append(_char);
			}
			return sb.toString();
		}
		
		private List<String> readMulti() throws IOException{
			Integer size = Integer.valueOf(read());
			List<String> lresult = new ArrayList<String>();
			//eg: *3
			if(size > 0) {
				for(int i=0;i<size;i++){
					while(true){
						char _char = (char)is.read();//$3
						if(_char == '$'){
							lresult.add(readString());
							break;
						}
					}
				}
			}
			return lresult;
		}
		//such as:
		//$3
		//012
		private String readString() throws IOException{
			Integer size = Integer.valueOf(read());
			//-1 is null
			if(size > 0){
				return read();
			}
			return null;
		}
	}
}
 

   

分享到:
评论

相关推荐

    Redis协议客户端模块.rar

    Redis使用简单明了的RESP(REdis Serialization Protocol)协议,它设计的目标是高效且易于解析。RESP支持多种数据类型,如简单字符串、错误、整数、浮点数、大整数和数组,使得客户端可以方便地将数据发送到服务器...

    这是redis实战里面的代码转换成为go语言实现.zip

    这个“myredis-master”项目提供了一个很好的学习和实践Go语言实现Redis客户端的机会,可以帮助开发者深入理解Redis协议和Go语言的并发编程。通过阅读和分析源码,可以进一步提升在分布式系统和网络编程方面的技能。

    javaredis源码-anatomy-lettuce:庖丁解架构之RedisJava客户端Lettuce架构解剖和源码精读相关代码

    - **Redis协议解析(Redis Protocol)**:Lettuce支持Redis的多种协议,如RESP(Redis Simple String Protocol),可以高效地解析和构建Redis命令。 3. **源码精读** - **CommandHandler**:处理来自Netty的响应...

    Java相关技术总结,包括redis,MySQL,RabbitMq,面试题总结,源码解读

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息中间件,它提供了一种可靠的消息传递机制,使应用程序可以通过异步处理来提高可扩展性和解耦性。在Java中,我们可以使用RabbitMQ的Java客户端API,...

    rabbitmq、spring、synchronized、redis面试题

    在IT行业中,面试题往往揭示了开发者需要...以及熟练运用Redis进行数据存储和缓存,理解其在高并发环境下的优势和最佳实践。这四个技术点构成了现代企业级应用开发中的基础架构,对于提升系统的稳定性和性能至关重要。

    Java相关知识总结,包括Java基础、MySQL、Springboot、MyBatis、Redis、RabbitMQ等

    Java是一种广泛使用的面向对象的编程语言,以其跨平台、健壮性和安全性著称。本总结将涵盖Java的基础知识,MySQL数据库的使用,Spring Boot框架,MyBatis持久层框架,Redis内存数据库,以及RabbitMQ消息队列系统。 ...

    企业级应用项目,springmvc+nutz+redis+rabbitmq+quartz+shiro

    在本项目中,"企业级应用项目,springmvc+nutz+redis+rabbitmq+quartz+shiro",开发者采用了一系列高级技术构建了一个具备高可扩展性和低耦合度的系统,旨在提供一个适用于有一定Java基础的学习者进行实践和进阶的...

    Sqlite3+RabbitMQ+Celery Python从零开始搭建一个生产者消费者服务模型配套安装文件

    RabbitMQ是基于Erlang语言开发的消息中间件,它是AMQP(Advanced Message Queuing Protocol)协议的实现,支持多种编程语言,包括Python。RabbitMQ的核心功能是接收和转发消息,使得生产者和消费者之间解耦,提高了...

    网络连接库

    在IT领域,网络连接库是实现应用程序间通信的关键组件,特别是在服务器端开发中。网络库通常包括各种协议的实现,如socket...开发者可以通过阅读代码、调试和实践,逐步掌握这些关键技术点,从而在实际项目中游刃有余。

    jdk,tomcat,mysql,rabbitmq部署文档和视频.zip

    它是Java编程语言的开发和运行环境,包含了Java虚拟机(JVM)、编译器(javac)、Java类库以及各种工具。安装JDK是进行Java开发的第一步。通常,我们需要下载对应操作系统的安装包,然后按照安装向导进行操作。安装...

    java源码剖析-jedis-sr:JedisforRedisJavaclient源码剖析笔记

    通过对Jedis源码的深入学习,可以了解到如何通过配置和编程实践来优化Redis操作的性能,如合理设置超时时间、避免阻塞操作、正确使用连接池等。 总之,Jedis的源码分析对于Java开发者来说是一份宝贵的资源,它不仅...

    JavaLearning.rar

    RabbitMQ是基于AMQP(Advanced Message Queuing Protocol)的消息中间件,用于异步处理和解耦系统组件。Java中通过RabbitMQ的Java客户端库可以方便地发送和接收消息。理解RabbitMQ的工作原理,如交换机、队列、绑定...

    kettle rabbitmq 插件开发

    10. **最佳实践**:遵循良好的编程和设计原则,例如代码复用、模块化设计,以及使用适当的工具和框架来提高开发效率和可维护性。 通过了解这些知识点,开发者可以创建一个定制的 Kettle RabbitMQ 插件,实现数据的...

    BBS邮箱web设计

    4. **电子邮件协议**:Web邮箱需要支持SMTP(Simple Mail Transfer Protocol)发送邮件,POP3或IMAP4(Post Office Protocol或Internet Message Access Protocol)接收邮件。开发者需要理解这些协议的工作原理,并能...

    springMVC+RabbitMQ+websocket

    同时,提到的“redis服务”集成,意味着项目可能进一步涉及到缓存管理,增加数据处理的效率和性能。总的来说,这个项目提供了一个全面的实践平台,对于提升开发者在企业级应用开发中的综合能力非常有益。

    Java毕业设计邮件管理系统

    4. **邮件协议**:为了实现邮件的发送和接收,系统需要支持SMTP(Simple Mail Transfer Protocol)和POP3(Post Office Protocol)或IMAP(Internet Message Access Protocol)。JavaMail API是一个常用的Java库,...

    即时通讯源代码即时通讯源代码

    开发者在构建即时通讯服务器时,可能需要熟悉如Java、C++或Node.js等后端编程语言,并掌握数据库设计,如MySQL、MongoDB或Redis,以存储用户信息和聊天记录。 客户端是用户与即时通讯系统交互的界面,通常包括桌面...

    hash.zip_中间件编程_Java_

    在IT行业中,中间件编程是连接应用程序和操作系统、网络、数据库等基础设施的关键技术。Java作为一门广泛使用的编程语言,...同时,压缩包中的文件提供了可能的辅助资料,帮助我们更深入地理解中间件编程的实践和理论。

    基于Pomelo框架的分布式游戏服务器设计.pdf

    文章中还提到了游戏通信数据交换的格式,采用了轻便高效的协议,如Google的Protocol Buffers,它是一种语言无关、平台无关的可扩展机制,用于序列化结构化数据。它比JSON或XML更加高效,因为它以二进制形式存储数据...

Global site tag (gtag.js) - Google Analytics