`
halloffame
  • 浏览: 55983 次
  • 性别: Icon_minigender_1
  • 来自: 广州
社区版块
存档分类
最新评论

Apache Thrift 初学小讲(五)【代理】

阅读更多

WEB中我们很常见的一种部署方式是在几个tomcat前面加一个nginx做反向代理,此时的nginx有了负载均衡和路由网关的功能。nginx工作在http层,thirft服务工作在tcp层上,所以不能用nginx作为thirft服务的代理(据说nginx可以装一个插件来支持tcp层)。tcp层上的有一个开源的叫HAProxy,用成熟的开源软件有好处,受限制也比较大,本节是用3种方式实现thirft代理,可以更灵活的实现一些自定义功能等。

 

1 用经典的socket实现:api写起来代码比较简单,就做做转发,但是因为不解析数据协议,且read和accept是阻塞的,所以一个连接会需要两个线程,高并发时,这个缺陷会非常严重。

import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketProxy {
	public static void main(String[] args) throws Exception { 
		ServerSocket serverSocket = null;
		try {
			serverSocket = new ServerSocket(4567);
			System.out.println("Starting the proxy on port 4567...");
			while (true) {
				Socket socket = serverSocket.accept();
				//可改成线程池提升性能
				new SocketThread(socket).start();
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			serverSocket.close();
		}
	}
    
	static class SocketThread extends Thread {
		private Socket socketIn = null;
		
		public SocketThread(Socket socket){
			socketIn = socket;
		}
		
    	public void run() {
    		//可改成缓冲流BufferedInputStream和BufferedOutputStream提升性能
    		InputStream isIn = null;
			OutputStream osIn = null;
			
    		Socket socketOut = null;
    		InputStream isOut = null; 
    		OutputStream osOut = null;
			
    		try {
    			isIn = socketIn.getInputStream();
				osIn = socketIn.getOutputStream();
				
				//真正的服务在9090端口
    			socketOut = new Socket("127.0.0.1", 9090); 
    			isOut = socketOut.getInputStream();  
    			osOut = socketOut.getOutputStream();
    			
    			//可改成线程池提升性能
    			new StreamThread(isOut, osIn).start();
    			
				byte[] buffer = new byte[1024];
				int temp = 0;		
				while ((temp = isIn.read(buffer)) != -1) {
					System.out.println("send:" + new String(buffer, 0, temp, "UTF-8"));
					osOut.write(buffer, 0, temp); 
				}
				
				System.out.println("server end");
			} catch (Exception e) {
				e.printStackTrace();
			} finally {
				closeAll(socketIn, isIn, osIn, socketOut, isOut, osOut);
			}
    	}
    	
    	private void closeAll(Socket socketIn, InputStream isIn, OutputStream osIn, 
    			Socket socketOut, InputStream isOut, OutputStream osOut) {
    		try {
				isIn.close();
				osIn.flush();
				osIn.close();
				socketIn.close();
				
				isOut.close();
				osOut.flush();
				osOut.close();
				socketOut.close();
			} catch (Exception e) {
				e.printStackTrace();
			}
    	}
    }
	
	static class StreamThread extends Thread {
		private InputStream is = null;
		private OutputStream os = null;
		
		public StreamThread(InputStream inputStream, OutputStream outputStream) {
			is = inputStream;
			os = outputStream;
		}
		
		public void run() {
			byte[] buffer = new byte[1024];
			int temp = 0;
			try {
				while ((temp = is.read(buffer)) != -1) {
					System.out.println("receive:" + new String(buffer, 0, temp, "UTF-8"));
					os.write(buffer, 0, temp); 
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 

2 针对1的缺点,用nio的api实现:api写起来比较恶心,只有一个线程处理一切操作,当然实际应用时当然不会只有一个线程,可以参考thirft的TThreadedSelectorServer是怎么做的,一个线程也非常危险,当一个连接出现问题抛异常的时候,整个程序down掉,其它的连接也一并会被影响。

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class NioProxy {
    private static final int Listenning_Port = 4567;
    private static final int Buffer_Size = 1024;
    private static final int TimeOut = 3000;
    
    public static void main(String[] args) throws Exception {
        // 创建一个在本地端口进行监听的服务Socket信道并设置为非阻塞方式
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(Listenning_Port));
        serverChannel.configureBlocking(false);
        System.out.println("Starting the proxy on port " + Listenning_Port + "...");
        
        // 创建一个选择器并将serverChannel注册到它上面
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        while (true) {
            // 等待某个信道就绪
            if( selector.select(TimeOut) == 0 ){
                System.out.println(".");
                continue;
            }
            
            // 获得就绪信道的键迭代器
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
            // 使用迭代器进行遍历就绪信道
            while (keyIter.hasNext()) {
                SelectionKey key = keyIter.next();
                
                // 这种情况是有客户端连接过来,准备一个Channel与之通信,并关联一个真正服务的Channel
                if (key.isValid() && key.isAcceptable()) { 
                	SocketChannel inChannel = ((ServerSocketChannel)key.channel()).accept();
                	inChannel.configureBlocking(false);
                    SelectionKey icKey = inChannel.register(key.selector(), SelectionKey.OP_READ);
                    
                    SocketChannel outChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9090)); 
                    outChannel.configureBlocking(false);
                    SelectionKey ocKey = outChannel.register(selector, SelectionKey.OP_READ); 
                    
                    //互相关联
                    icKey.attach(ocKey);
                    ocKey.attach(icKey);
                }
                
                // 客户端有写入时
                if (key.isValid() && key.isReadable()) {
                    // 获得与客户端通信的信道
                    SocketChannel channel = (SocketChannel)key.channel();
                    SelectionKey relateKey = (SelectionKey)key.attachment(); 
                    SocketChannel relateChannel = (SocketChannel)relateKey.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(Buffer_Size);
                    buffer.clear();
                    
                    // 读取信息获得读取的字节数
                    long bytesRead = channel.read(buffer);
                    if (bytesRead == -1) {
                    	// 没有读取到内容的情况
                    	channel.close();
                    	relateChannel.close();
                    } else {
                        // 将缓冲区准备为数据传出状态
                        buffer.flip();
                        relateChannel.write(buffer);
                        // 设置为下一次读取或是写入做准备
                        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
                    }
                }
                
                keyIter.remove();
            }
        }      
    }
}

 

3 用thirft的api实现:这个方式可以利用thirft本身的成熟的东西,可以解析数据包,做做认证或者过滤之类的功能。查看thirft的源码,从TSimpleServer的serve方法看进去,我们看到读数据,处理数据,写数据是在processor的process方法里,因此我们可以不用thrift生成的processor,自定义一个万能的processor,接收各种类型的数据再转发出去。

import java.util.ArrayList;
import java.util.List;

import org.apache.thrift.TApplicationException;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.protocol.TList;
import org.apache.thrift.protocol.TMap;
import org.apache.thrift.protocol.TMessage;
import org.apache.thrift.protocol.TMessageType;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.protocol.TProtocolUtil;
import org.apache.thrift.protocol.TSet;
import org.apache.thrift.protocol.TStruct;
import org.apache.thrift.protocol.TType;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFastFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportFactory;

public class ThirftProxy {
	public static void main(String [] args) throws Exception {
		//自定义的一个processor,非生成代码
		ProxyProcessor proxyProcessor = new ProxyProcessor();	
        //指定的通信协议
        TProtocolFactory tProtocolFactory = new TJSONProtocol.Factory(); 
        //指定的通信方式
        TTransportFactory tTransportFactory = new TFastFramedTransport.Factory(); 

        int port = 4567;
    	TNonblockingServerSocket tNonblockingServerSocket =
    			new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port));
    	TThreadedSelectorServer.Args tThreadedSelectorServerArgs
          	= new TThreadedSelectorServer.Args(tNonblockingServerSocket);
    	tThreadedSelectorServerArgs.processor(proxyProcessor);
    	tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory);
    	tThreadedSelectorServerArgs.transportFactory(tTransportFactory);
    	//指定的服务器模式	
    	TServer serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs);
        System.out.println("Starting the proxy on port " + port + "...");
        serverEngine.serve();
	}
	
	static class ProxyProcessor implements TProcessor {
		@Override
		public boolean process(TProtocol in, TProtocol out) throws TException {
			TMessage msg = in.readMessageBegin();
			List<ProxyStruct> inDatas = readData(in);
			in.readMessageEnd();
			
			//转发请求到9090端口
			TSocket socket = new TSocket("127.0.0.1", 9090);
	        socket.setTimeout(1000);
	        TTransport transport = socket;
	        transport = new TFastFramedTransport(transport);
		    TProtocol tProtocol = new TJSONProtocol(transport);
		    if ( !transport.isOpen() ) {
	        	transport.open();
	        }
		    
		    tProtocol.writeMessageBegin(msg);
		    wirteData(tProtocol, inDatas); 
		    tProtocol.writeMessageEnd();
		    tProtocol.getTransport().flush();
		    
		    int seqid = msg.seqid;
		    String methodName = msg.name;
		    msg = tProtocol.readMessageBegin();
		    if (msg.type == TMessageType.EXCEPTION) {
		        TApplicationException x = TApplicationException.read(tProtocol);
		        tProtocol.readMessageEnd();
		        throw x;
		    }
		    if (msg.seqid != seqid) {
		        throw new TApplicationException(TApplicationException.BAD_SEQUENCE_ID, methodName + " failed: out of sequence response");
		    }
			inDatas = readData(tProtocol); 
			tProtocol.readMessageEnd();
			
			out.writeMessageBegin(msg);
		    wirteData(out, inDatas); 
		    out.writeMessageEnd();
		    out.getTransport().flush();
			
			return true;
		}
		
		private void wirteData(TProtocol out, List<ProxyStruct> outDatas) throws TException {
			out.writeStructBegin(new TStruct(""));
		    for (ProxyStruct outData : outDatas) {
		    	TField field = outData.field;
		    	Object value = outData.value;
		    	out.writeFieldBegin(field);
		    	
		    	switch (field.type) {
					case TType.VOID:
						break;
					case TType.BOOL:
						out.writeBool((boolean)value);
						break;
					case TType.BYTE:
						out.writeByte((byte)value);
						break;
					case TType.DOUBLE:
						out.writeDouble((double)value);
						break;
					case TType.I16:
						out.writeI16((short)value);
						break;
					case TType.I32:
						out.writeI32((int)value);
						break;
					case TType.I64:
						out.writeI64((long)value);
						break;
					case TType.STRING:
						out.writeString((String)value);
						break;
					case TType.STRUCT:
						wirteData(out, (List<ProxyStruct>)value);
						break;
					case TType.MAP:
						//out.writeMapBegin((TMap)value);
						//out.writeMapEnd();
						break;
					case TType.SET:
						//out.writeSetBegin((TSet)value);
						//out.writeSetEnd();
						break;
					case TType.LIST:
						//out.writeListBegin((TList)value);
						//out.writeListEnd();
						break;
					case TType.ENUM:
						break;
					default:
				}
		    	
		    	out.writeFieldEnd();
		    }
		    out.writeFieldStop();
			out.writeStructEnd();
		}
		
		private List<ProxyStruct> readData(TProtocol in) throws TException {
			List<ProxyStruct> inDatas = new ArrayList<ProxyStruct>();
			TField schemeField;
			in.readStructBegin();
			while (true) {
				schemeField = in.readFieldBegin();
				if (schemeField.type == TType.STOP) { 
					break;
				}
				ProxyStruct inData = null;
				
				switch (schemeField.type) {
					case TType.VOID:
						TProtocolUtil.skip(in, schemeField.type);
						break;
					case TType.BOOL:
						inData = new ProxyStruct(schemeField, in.readBool());
						break;
					case TType.BYTE:
						inData = new ProxyStruct(schemeField, in.readByte());
						break;
					case TType.DOUBLE:
						inData = new ProxyStruct(schemeField, in.readDouble());
						break;
					case TType.I16:
						inData = new ProxyStruct(schemeField, in.readI16());
						break;
					case TType.I32:
						inData = new ProxyStruct(schemeField, in.readI32());
						System.out.println("I32-->" + inData.value); 
						break;
					case TType.I64:
						inData = new ProxyStruct(schemeField, in.readI64());
						break;
					case TType.STRING:
						inData = new ProxyStruct(schemeField, in.readString());
						System.out.println("STRING-->" + inData.value);
						break;
					case TType.STRUCT:
						inData = new ProxyStruct(schemeField, readData(in));
						break;
					case TType.MAP:
						//inData = new ProxyStruct(schemeField, in.readMapBegin());
						/**
						 * 这里我懒了,不想写了,readMapBegin返回的TMap对象有3个字段
						 * keyType,valueType,size,没错就是map的key的类型,value的类型,map的大小
						 * 从0到size累计循环的按类型读取key和读取value,构造一个hashmap就可以了
						 */
						//in.readMapEnd();
						break;
					case TType.SET:
						//inData = new ProxyStruct(schemeField, in.readSetBegin());
						//同理MAP类型
						//in.readSetEnd();
						break;
					case TType.LIST:
						//inData = new ProxyStruct(schemeField, in.readListBegin());
						//同理MAP类型
						//in.readListEnd();
						break;
					case TType.ENUM:
						//Enum类型传输时是个i32
						TProtocolUtil.skip(in, schemeField.type);
						break;
					default:
						TProtocolUtil.skip(in, schemeField.type);
				}
				if (inData != null ) inDatas.add(inData);
				
				in.readFieldEnd();
			}
			in.readStructEnd();
			
			return inDatas;
		}
		
	}
	
	//用来存储读取到的各种类型的字段
	static class ProxyStruct {
		public TField field;
		public Object value;
		
		public ProxyStruct(TField tField, Object object) {
			field = tField;
			value = object;
		}
	}
}

 

注意:这种方式无法支持TTupleProtocol,查看源码知道TTupleProtocol继承了TCompactProtocol,TTupleProtocol编码时为了省空间,没有字段的元信息(id,名字,类型等),只有一个bitset表明哪几个field有值,然后直接用生成的代码读取这些有值的field。TTupleProtocol这种编码方式有个缺点就是服务版本间不兼容。

 

工程文件结构


 

依次运行TestServer,SocketProxy/NioProxy/ThirftProxy,TestClient,结果如预期。

 

推荐一些比较好的相关文章:

通信协议之序列化 http://blog.chinaunix.net/uid-27105712-id-3266286.html

Apache Thrift设计概要 http://calvin1978.blogcn.com/articles/apache-thrift.html

Java NIO浅析 https://zhuanlan.zhihu.com/p/23488863

 

补充:

jdk1.7后有了aio,和nio有什么不同呢,nio是同步非阻塞的,意思是在真正read数据的时候是阻塞的,只是通过循环信道,事件机制和buffer达到并发的目的,所以对于那些读的数据比较大,读写过程时间长的,nio就不太适合。而aio是异步非阻塞的,read到数据后是通过回调通知相应的方法来继续后面的逻辑,类似于js的callback,所以aio能够胜任那些重量级,读写过程长的任务。写了一个例子AioProxy.java,已经放在附件里了。

 

java中的AIO https://www.jianshu.com/p/c5e16460047b

[高并发Java 八] NIO和AIO https://my.oschina.net/hosee/blog/615269

 

感谢阅读!附件是eclipse工程源码。

  • 大小: 31.8 KB
2
0
分享到:
评论

相关推荐

    Apache Thrift 初学小讲(七)【负载均衡】

    在本篇初学小讲中,我们将重点关注Thrift在负载均衡方面的应用,这对于构建大型分布式系统至关重要。Thrift通过提供一种高效的数据序列化机制以及RPC(远程过程调用)接口,使得不同编程语言之间可以轻松地进行通信...

    Apache Thrift 初学小讲(六)【spring】

    在本篇小讲中,我们将探讨如何将Thrift与Spring框架结合,以便于构建微服务架构。 首先,让我们了解Thrift的基本工作原理。Thrift IDL(接口定义语言)允许开发者声明服务方法和数据类型,类似于Java中的接口或C++...

    Apache Thrift 初学小讲(三)【http】

    在“Apache Thrift 初学小讲(三)【http】”中,我们将深入探讨Thrift如何与HTTP协议相结合,实现基于HTTP的服务通信。 首先,Thrift 提供了一个名为 `ThriftServlet` 的组件,它是将Thrift服务与Java Servlet容器...

    Apache Thrift 初学小讲(八)【zookeeper实现服务注册与发现】

    Apache Thrift 是一个开源的软件框架,用于构建可伸缩的、跨语言的服务。它结合了接口定义语言(IDL)和库,允许开发者定义数据结构和服务接口,然后自动生成多种编程语言的代码,使得不同语言之间可以高效地进行...

    用C#和C++写的Apache Thrift的小范例

    本例改编自Apache Thrift教程: http://mikecvet.wordpress.com/2010/05/13/apache-thrift-tutorial-the-sequel/ http://chanian.com/2010/05/13/thrift-tutorial-a-php-client/ 原教程使用的是c++ server和...

    thrift-Demo

    Thrift是一种开源的跨语言服务开发框架,由Facebook于2007年开发并开源,后来成为Apache软件基金会的顶级项目。Thrift的核心思想是通过定义一种中间描述文件(.thrift),来实现数据结构和服务接口的跨语言共享。这...

    THRIFT 学习资料

    THRIFT 是一个开源的跨语言服务开发框架,由 Facebook 在 2007 年创建并贡献给了 Apache 基金会。它旨在提供一种高效、可扩展、跨平台的解决方案,用于构建分布式系统中的应用程序接口(API)。通过 THRIFT,开发者...

    thrift入门学习教程

    Thrift是由Facebook开发并在2007年贡献给Apache基金会的一款开源项目。它的主要目的是解决跨平台、跨语言的系统间大数据量传输通信问题。在早期,Facebook面临着系统之间语言环境不一致的问题,因此需要一种能够实现...

    thrift1 查询hbase

    在IT领域,尤其是在大数据处理和分布式系统中,HBase是一个重要的NoSQL数据库,它基于Apache Hadoop并提供了高性能、列式存储、可扩展的实时读写能力。而Thrift是一种跨语言的服务框架,由Facebook开发,它允许不同...

    Java Thrift demo例子

    Thrift是一种开源的跨语言服务开发框架,由Facebook于2007年开源,现由Apache基金会维护。它允许开发者定义服务接口,然后自动生成多种编程语言的代码,使得不同语言之间可以进行高效、可靠的RPC(远程过程调用)...

    thrift-0.9.1

    Thrift-0.9.1 是一个开源的跨语言服务开发框架,由Facebook于2007年开源,后来成为了Apache软件基金会的顶级项目。这个版本的Thrift源码包包含了一系列的工具和库,使得开发者能够高效地构建分布式系统。Thrift的...

    thrift c++ php

    Thrift是一种开源的跨语言服务开发框架,最初由Facebook开发并贡献给Apache基金会。它允许开发者定义服务接口和数据类型,然后自动生成多种编程语言的代码,使得在不同语言之间进行高效、可靠的通信成为可能。在这个...

    【Thrift之C++远程调用helloworld菜鸟教程】

    Thrift是一种跨语言的服务开发框架,最初由Facebook开发,现在是Apache软件基金会的项目。它允许程序员定义服务接口,然后自动生成多种编程语言的代码,使得在这些不同的语言之间进行远程过程调用(RPC)变得容易。...

    Thrift和Avro实例

    Thrift和Avro是两种广泛使用的数据序列化和远程过程调用(RPC)框架,它们在分布式系统中扮演着重要角色。在这个实例中,我们将深入理解这两种技术,并探讨它们各自的特性和应用场景。 Thrift是由Facebook开发的一...

    thrift.demo.rar

    Thrift是一种开源的跨语言服务开发框架,由Facebook于2007年开发并开源,后来成为Apache软件基金会的顶级项目。它通过定义一种中间语言(IDL,Interface Definition Language)来描述服务接口,允许开发者在不同的...

    Thrift服务开发框架 v0.16.0.gz

    Thrift服务开发框架v0.16.0是一款强大的跨语言服务开发工具,它由Facebook开源,现由Apache基金会维护。这个版本的Thrift提供了一种高效、灵活且可扩展的方式来构建分布式系统。Thrift的核心理念是通过定义一种中间...

    apache zeppelin使用文档

    ### Apache Zeppelin 使用指南 #### 一、Apache Zeppelin 概览 Apache Zeppelin 是一款功能强大的基于 Web 的 Notebook 服务器,它为数据科学家提供了一个交互式的环境来探索数据、编写代码并创建可视化报告。...

    apache-cassandra-0.8.4-bin.tar.gz 分布式数据库

    安装和使用Cassandra-0.8.4,你需要解压下载的"apache-cassandra-0.8.4"压缩包,配置相关的环境变量,启动服务,并使用CQL或者Thrift接口进行数据操作。对于初学者,理解其分布式特性和数据模型是入门的关键。同时,...

Global site tag (gtag.js) - Google Analytics