`
落叶换新叶
  • 浏览: 25637 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

基于netty的分布式模拟实现

阅读更多

 

1、RpcService,把本地的服务暴露给ZooKeeper

@Component("rpcService")
public class RpcService implements ApplicationContextAware,InitializingBean {

	private String serviceAddress;
	private Map<String,Object> handlerMap;
	private ZookeeperRegister zk;
	
	public RpcService(String serviceAddress, ZookeeperRegister zk) {
		this.serviceAddress = serviceAddress;
		this.zk = zk;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel channel) throws Exception {
                            channel.pipeline()
                                    .addLast(new RpcDecoder(RpcRequest.class)) // 将 RPC 请求进行解码(为了处理请求)
                                    .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应)
                                    .addLast(new RpcHandler(handlerMap)); // 处理 RPC 请求
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
		
            String[] ipMArray = serviceAddress.split(":");
           // bootstrap.bind(ipMArray[0], Integer.parseInt(ipMArray[1]));
            
            ChannelFuture future = bootstrap.bind(ipMArray[0], Integer.parseInt(ipMArray[1])).sync();
            //注册  
            if(zk!=null) {
            	zk.register(serviceAddress);
            }
            
            future.channel().closeFuture().sync();
            
            
        }catch(Exception e) {
        	
        }finally {
        	 workerGroup.shutdownGracefully();
             bossGroup.shutdownGracefully();
        }
	}

	@Override
	public void setApplicationContext(ApplicationContext context) throws BeansException {
		handlerMap = context.getBeansWithAnnotation(RpcServiceAnnotation.class);
		
		Map<String,Object> controllerAnnotation = context.getBeansWithAnnotation(Controller.class);
		
		try {
			for(Object obj:controllerAnnotation.values()) {
				Class<?> clazz = obj.getClass();
				Field[] fields = clazz.getFields();
				for(Field f:fields) {
					boolean flag = f.isAnnotationPresent(Resource.class);
					if(flag) {
						Resource r = f.getAnnotation(Resource.class);
						String name = r.name();
						if("".equals(name)) {
							name = f.getName();
						}
						f.setAccessible(true);
						
						Object proxyObject = RpcProxy.newInstance(f.getClass());
						
						f.set(obj, proxyObject);
					}
				}
			}
		} catch (SecurityException e) {
			e.printStackTrace();
		} catch (IllegalArgumentException e) {
			e.printStackTrace();
		} catch (IllegalAccessException e) {
			e.printStackTrace();
		}
	}
}

2、RpcService处理handler

public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> {

	private Map<String,Object> handlerMap;
	
	public RpcHandler(Map<String, Object> handlerMap) {
		this.handlerMap = handlerMap;
	}

	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RpcRequest req) throws Exception {
		Object handlerObject = handlerMap.get(req.getBeanName());
		
	    Class handlerObjectClass = handlerObject.getClass();
	    Method method = handlerObjectClass.getMethod(req.getMethodName(), req.getParamTypes());
	    Object result = method.invoke(handlerObject, req.getParamArgs());
	    
	    RpcResponse response = new RpcResponse();
	    response.setResult(result);
	    
	    ctx.writeAndFlush(response);
	}

}

 3、RpcDecoder,channel解码

public class RpcDecoder extends ByteToMessageDecoder {
	private Class<?> decodeClass;
	
	public RpcDecoder(Class<?> decodeClass) {
		this.decodeClass = decodeClass;
	}
	@Override
	protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
		//解码
		if(in.readableBytes()>0) {
			byte[] byteArray = new byte[in.readInt()];
			in.readBytes(byteArray);
			Object decodeObject = SerializationUtil.deserialize(byteArray, decodeClass);
			
			out.add(decodeObject);
		}
	}
}

 4、RpcEncode,发送数据转byte

public class RpcEncoder extends MessageToByteEncoder {

	private Class<?> encodeClass;
	
	public RpcEncoder(Class<?> encodeClass) {
		this.encodeClass = encodeClass;
	}

	@Override
	protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
		//编码
		if(encodeClass.isInstance(msg)) {			
			byte[] msgByte = SerializationUtil.serialize(msg);
			
			out.setInt(0, msgByte.length);
			out.readBytes(msgByte);
		}
	}
}

 5、RpcRequest,请求

public class RpcRequest implements Serializable {

	private static final long serialVersionUID = 1L;
	
	private String methodName;
	private String beanName;
	private Object[] paramArgs;
	private Class<?>[] paramTypes;
	
	public String getMethodName() {
		return methodName;
	}
	public void setMethodName(String methodName) {
		this.methodName = methodName;
	}
	public String getBeanName() {
		return beanName;
	}
	public void setBeanName(String beanName) {
		this.beanName = beanName;
	}
	public Object[] getParamArgs() {
		return paramArgs;
	}
	public void setParamArgs(Object[] paramArgs) {
		this.paramArgs = paramArgs;
	}
	public Class<?>[] getParamTypes() {
		return paramTypes;
	}
	public void setParamTypes(Class<?>[] paramTypes) {
		this.paramTypes = paramTypes;
	}
}

 6、RpcResponse

public class RpcResponse {

	public Object result;
	

	public Object getResult() {
		return result;
	}

	public void setResult(Object result) {
		this.result = result;
	}
	
}

 

7、RpcServiceAnnotation用于提供service的类

/**
 * RPC接口注解
 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcServiceAnnotation {

	public String value();
	
}

 8、BizProxy,用于Controller类属性注解

/**
 * 用于Controller类下面的属性注解
 */
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface BizProxy {

	public String value();
	
}

 9、SerializationUtil

public class SerializationUtil {

	private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>();

    private static Objenesis objenesis = new ObjenesisStd(true);

    private SerializationUtil() {
    }

    @SuppressWarnings("unchecked")
    private static <T> Schema<T> getSchema(Class<T> cls) {
        Schema<T> schema = (Schema<T>) cachedSchema.get(cls);
        if (schema == null) {
            schema = RuntimeSchema.createFrom(cls);
            if (schema != null) {
                cachedSchema.put(cls, schema);
            }
        }
        return schema;
    }

    @SuppressWarnings("unchecked")
    public static <T> byte[] serialize(T obj) {
        Class<T> cls = (Class<T>) obj.getClass();
        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
        try {
            Schema<T> schema = getSchema(cls);
            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        } finally {
            buffer.clear();
        }
    }

    public static <T> T deserialize(byte[] data, Class<T> cls) {
        try {
            T message = (T) objenesis.newInstance(cls);
            Schema<T> schema = getSchema(cls);
            ProtostuffIOUtil.mergeFrom(data, message, schema);
            return message;
        } catch (Exception e) {
            throw new IllegalStateException(e.getMessage(), e);
        }
    }
}

 10、ZookeeperRegister,注册ZooKeeper

@Component("zk")
public class ZookeeperRegister implements Watcher {

	private String registerAddress;
	private String childService = "nettyServiceIp";
	private CountDownLatch countDownLatch;
	private ZooKeeper zk;

	public ZookeeperRegister(String registerAddress) {
		this.registerAddress = registerAddress;
		connect();
	}
	
	public void connect() {
		try {
			countDownLatch = new CountDownLatch(1);
			zk = new ZooKeeper(registerAddress, ZookeeperContant.SESSION_TIME_OUT, this);
			countDownLatch.await();
			Stat stat = zk.exists(ZookeeperContant.ROOT, false);
			if(stat==null) {
				zk.create(ZookeeperContant.ROOT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void register(String serviceAddress) {
		try {
			//创建子节点
			zk.create(ZookeeperContant.ROOT+"/"+childService, serviceAddress.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	@Override
	public void process(WatchedEvent arg0) {
		if (arg0.getState() == Event.KeeperState.SyncConnected) {
			countDownLatch.countDown();
        }
	}
}

 11、ZookeeperContant

public interface ZookeeperContant {

	public String CHILD_SERVICE = "nettyServiceIp";
	
	public String ROOT = "netty";
	
	public int SESSION_TIME_OUT = 2000;
}

 以下是客户端的类,用于获取服务地址和发送请求

 

12、RpcClient,发送请求

public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> {

	private ServiceDiscovery serviceDiscovery;
	
	private RpcResponse response;
	
	private Object object = new Object();
	
	public RpcClient() {
	}

	public RpcClient(ServiceDiscovery serviceDiscovery) {
		this.serviceDiscovery = serviceDiscovery;
	}
	
	public RpcResponse send(RpcRequest req) {
		EventLoopGroup bossGroup = new NioEventLoopGroup();
		try {
			Bootstrap bootstrap = new Bootstrap();
			bootstrap.group(bossGroup).channel(NioSocketChannel.class)
			        .handler(new ChannelInitializer<SocketChannel>() {
			            @Override
			            public void initChannel(SocketChannel channel) throws Exception {
			                channel.pipeline()
			                        .addLast(new RpcDecoder(RpcRequest.class)) // 将 RPC 请求进行解码(为了处理请求)
			                        .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应)
			                        .addLast(RpcClient.this); // 处理 RPC 请求
			            }
			        })
			        .option(ChannelOption.SO_KEEPALIVE, true);
			
			String addressIp = serviceDiscovery.discovery();
			String[] ipArray = addressIp.split(":");
			//发送数据
			ChannelFuture future = bootstrap.connect(ipArray[0], Integer.parseInt(ipArray[1])).sync();
			future.channel().writeAndFlush(req);
			//等待返回值
			synchronized (object) {
				object.wait();
			}
			
			
		} catch (Exception e) {
			e.printStackTrace();
		}finally {
            bossGroup.shutdownGracefully();
		}
		return response;
	}
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, RpcResponse res) throws Exception {
		response = res;
		synchronized (object) {
			object.notifyAll();
		}
	}
}

 13、RpcProxy,初始化对象

public class RpcProxy {
	private Object target;
	public static Object newInstance(final Class<?> cls) {
		//RpcServiceAnnotation annotation = cls.getAnnotation(RpcServiceAnnotation.class);
		//final String beanName = annotation.value();		
		return Proxy.newProxyInstance(cls.getClassLoader(), cls.getInterfaces(), new InvocationHandler() {
			@Override
			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
				RpcRequest req = new RpcRequest();
				req.setBeanName(cls.getName());       //需要处理
				req.setMethodName(method.getName());
				req.setParamArgs(args);
				req.setParamTypes(method.getParameterTypes());
				
				RpcClient client = new RpcClient();
				RpcResponse response = client.send(req);
				
				return response;
			}
		});
	}
}

 14、ServiceDiscovery和ServiceDiscoveryImpl

public interface ServiceDiscovery {

	public String discovery();
	
}
@Component("serviceDiscovery")
public class ServiceDiscoveryImpl implements ServiceDiscovery,Watcher {
	private String serviceDiscoveryAddress;
	private CountDownLatch latch = new CountDownLatch(1);
	private ZooKeeper zk;	
	public ServiceDiscoveryImpl(String serviceDiscoveryAddress) {
		this.serviceDiscoveryAddress = serviceDiscoveryAddress;
		connect();
	}	
	public void connect() {
		try {			
			zk = new ZooKeeper(serviceDiscoveryAddress,ZookeeperContant.SESSION_TIME_OUT,this);
			latch.await();
		}catch(Exception e) {
			e.printStackTrace();
		}
	}
	@Override
	public String discovery() {
		try {
			List<String> serviceIpList = new ArrayList<String>();
			List<String> listAddress = zk.getChildren(ZookeeperContant.ROOT+"/"+ZookeeperContant.CHILD_SERVICE, false);
			for(String str:listAddress) {
				byte[] ipBytes = zk.getData(str, false, new Stat());
				String address = new String(ipBytes);
				serviceIpList.add(address);
			}
			if(serviceIpList.size()==1) {
				return serviceIpList.get(0);
			}
			Random random = new Random();
			
			String addressIp = serviceIpList.get(random.nextInt(serviceIpList.size()));
			
			return addressIp;
			
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		return null;
	}

	@Override
	public void process(WatchedEvent event) {
		if(event.getState().equals(Event.KeeperState.SyncConnected)) {
			latch.countDown();
		}
	}
}

 15、BeanFactoryProcessTest,Scan,BeanFactoryBeanTest

@Component("beanFactoryProcess")
public class BeanFactoryProcessTest implements ApplicationContextAware,BeanFactoryPostProcessor {

	private ApplicationContext applicationContext;
	
	@Override
	public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
		Scan san = new Scan((BeanDefinitionRegistry)beanFactory);
		san.setResourceLoader(applicationContext);
	    san.scan("com.paic.egis.*");
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.applicationContext = applicationContext;
	}
}

 

public class BeanFactoryBeanTest<T> implements FactoryBean<T> {

	private String innerClassName;
	
	public BeanFactoryBeanTest(String innerClassName) {
		this.innerClassName = innerClassName;
	}

	@Override
	public T getObject() throws Exception {
		final Class<?> className = Class.forName(innerClassName);
		if(className.isInterface()) {
			return (T)RpcProxy.newInstance(className);
		}else {
			 Enhancer enhancer = new Enhancer();
	         enhancer.setSuperclass(className);
	         enhancer.setNamingPolicy((NamingPolicy) SpringNamingPolicy.INSTANCE);
	         enhancer.setCallback(new MethodInterceptor() {
				
				@Override
				public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
					
					RpcRequest req = new RpcRequest();
					req.setBeanName(className.getName());       //需要处理
					req.setMethodName(method.getName());
					req.setParamArgs(args);
					req.setParamTypes(method.getParameterTypes());
					
					RpcClient client = new RpcClient();
					RpcResponse response = client.send(req);
					
					return response;
				}
			});
	         return (T) enhancer.create();
		}
	}

	@Override
	public Class<?> getObjectType() {
		try {
			return Class.forName(innerClassName);
		} catch (ClassNotFoundException e) {
			e.printStackTrace();
		}
		return null;
	}

	@Override
	public boolean isSingleton() {
		
		return true;
	}
	
}

 

public class Scan extends ClassPathBeanDefinitionScanner {

	public Scan(BeanDefinitionRegistry registry) {
		super(registry);
	}

	@Override
	protected Set<BeanDefinitionHolder> doScan(String... basePackages) {
		Set<BeanDefinitionHolder> definitionHolders = super.doScan(basePackages);
		for(BeanDefinitionHolder holder:definitionHolders) {
			GenericBeanDefinition bean = (GenericBeanDefinition)holder.getBeanDefinition();
			bean.getPropertyValues().add("innerClass", bean.getBeanClassName());
			bean.setBeanClass(BeanFactoryBeanTest.class);
		}
		return definitionHolders;
	}

	@Override
	protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) {
		
		 return super.isCandidateComponent(beanDefinition) && beanDefinition.getMetadata()
				 .hasAnnotation(RpcServiceAnnotation.class.getName());
		//return super.isCandidateComponent(beanDefinition);
	}
}

 

 

 

 

 

 

 

 

 

 

 

 

0
0
分享到:
评论

相关推荐

    TCP调试助手(基于Netty)

    【TCP调试助手(基于Netty)】是一款专为Windows平台设计的TCP协议调试软件,它利用了Netty这一高性能、异步事件驱动的网络应用框架。Netty是由Java编写的,广泛应用于各种分布式系统、服务器和客户端之间的通信,尤其...

    基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip

    基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式...

    Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议

    本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...

    基于Netty+kafka的物联网数据采集脚手架..zip

    "基于Netty+kafka的物联网数据采集脚手架"是一个专门针对这一需求设计的解决方案。Netty是一个高性能、异步事件驱动的网络应用程序框架,而Kafka是一款分布式流处理平台,两者结合可以构建出高效稳定的数据采集系统...

    netty-netty-4.1.19.Final.zip_netty_netty学习_rocketmq

    Netty的核心设计理念是基于Reactor模式,这是一种事件驱动的设计模式,它使得Netty能够高效地处理并发连接。在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据...

    netty demo 在线订票

    在这个“netty demo 在线订票”项目中,我们将探讨如何利用Netty来模拟实现一个多线程的在线订票系统,以此来学习Netty的核心技术和Java编程。 在Java世界中,Netty因其高效、灵活以及丰富的特性而被广泛应用在各种...

    Netty私协议

    Netty私协议是一种基于Java的高性能网络通信框架——Netty实现的自定义通信协议。Netty是一个异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。在众多分布式系统、微服务架构以及游戏...

    Netty服务端和客户端调用demo

    Netty服务端和客户端调用demo是一个基于Spring Boot和Maven构建的应用,它演示了如何使用Netty框架进行网络通信。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的Java应用。这个demo...

    微服务分布式系统架构之zookeeper与dubbo.pdf

    Dubbo是由阿里巴巴开源的一个高性能Java RPC框架,它基于Netty进行通信,提供了高效的远程调用能力。在微服务架构中,Dubbo可以将不同的微服务通过RPC的方式进行通信和调用,它可以集成SpringBoot进行开发,使得...

    Netty服务端与客户端依靠Java序列化传输数据

    对于基于Java序列化的Netty应用,单元测试和集成测试至关重要,确保数据的正确序列化和反序列化。可以使用如Mockito和WireMock等工具模拟服务端和客户端交互,验证数据传输的正确性。 综上所述,Netty服务端和...

    使用java基于raft协议实现的kv数据库.zip

    在Java实现的这个kv数据库中,开发者可能会采用多线程和网络通信技术来模拟分布式环境。每个节点都会有一个独立的线程来处理其角色,如监听网络通信、处理选举和日志同步。Raft的核心机制包括选举逻辑、日志复制和...

    Netty实例-群聊系统.zip

    在这个“Netty实例-群聊系统”中,我们将探讨如何使用Netty实现一个简单的TCP群聊系统。TCP(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议,它确保了数据的有序和无损传输...

    基于socket的rpc

    在本文中,我们将探讨基于Socket的RPC实现以及后续的改进计划,包括升级到Netty作为底层通信库,以及引入Zookeeper作为注册中心。 首先,让我们深入了解基于Socket的RPC工作原理。在客户端,我们定义了一个接口,...

    netty-akka-template:带有Netty和Akka的入门Websocket服务器

    **标题解析:** "Netty-Akka-Template" ...这个项目为学习者提供了一个实践平台,通过它,开发者可以深入理解如何利用Netty处理网络连接,以及如何借助Akka处理并发和分布式系统的问题,同时掌握WebSocket的实现细节。

    毕设&课设&项目&实训-基于Web的聊天系统-模拟QQ的基本功能.zip

    基于Web的聊天系统-模拟QQ的基本功能 ( springboot, netty, jpa, mybatis, mysql, redis, vue ).所有源码均经过严格测试,可以直接运行,可以放心下载使用。有任何使用问题欢迎随时与博主沟通,第一时间进行解答!该...

    java版仿QQ即时通讯系统

    【Java版仿QQ即时通讯系统】是一个基于Java技术构建的实时通信平台,旨在模拟QQ的聊天功能。在Java编程环境中,MyEclipse被用作集成开发环境(IDE),提供了便捷的代码编写、调试和管理工具,使得开发过程更加高效。...

    基于图像识别的跨平台测试脚本录制与回放系统的设计与实现1

    系统利用ADB和WDA工具对Android和iOS设备进行统一管理,通过MiniCap快速获取设备屏幕截图,借助Netty框架实现实时的Web端与设备间的TCP通信,确保界面同步。此外,系统利用SIFT(尺度不变特征变换)和OCR(光学字符...

    Apus:基于Scala和Akka的高性能分布式XMPP服务器

    特征: 高性能:系统采用完全异步方式实现,并发使用actor模型(akka),网络IO使用NIO(netty)。 no SPOF(单点故障):消息路由算法基于一致性哈希设计,集群中没有主节点。 #表现##实验服务器集群:2 个具有...

    基于dubbo的服务方完整项目示例

    - Dubbo采用基于Netty的异步通讯模型,提高了服务调用的性能。 8. **监控与治理** - Dubbo提供了监控中心,可以统计服务的调用次数、调用时间、失败率等指标,帮助我们监控和管理服务。 - 可以通过`...

    从零开始手写 dubbo rpc 框架-11-fail失败策略.pdf

    在这个例子中,框架是基于Netty实现的,Netty是一个高效的异步事件驱动的网络应用程序框架,常用于创建高并发、高性能的服务器和客户端应用。Netty提供了低级别的网络操作API,使得我们能够方便地构建RPC通信。 在...

Global site tag (gtag.js) - Google Analytics