1、RpcService,把本地的服务暴露给ZooKeeper
@Component("rpcService") public class RpcService implements ApplicationContextAware,InitializingBean { private String serviceAddress; private Map<String,Object> handlerMap; private ZookeeperRegister zk; public RpcService(String serviceAddress, ZookeeperRegister zk) { this.serviceAddress = serviceAddress; this.zk = zk; } @Override public void afterPropertiesSet() throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new RpcDecoder(RpcRequest.class)) // 将 RPC 请求进行解码(为了处理请求) .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应) .addLast(new RpcHandler(handlerMap)); // 处理 RPC 请求 } }) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); String[] ipMArray = serviceAddress.split(":"); // bootstrap.bind(ipMArray[0], Integer.parseInt(ipMArray[1])); ChannelFuture future = bootstrap.bind(ipMArray[0], Integer.parseInt(ipMArray[1])).sync(); //注册 if(zk!=null) { zk.register(serviceAddress); } future.channel().closeFuture().sync(); }catch(Exception e) { }finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } @Override public void setApplicationContext(ApplicationContext context) throws BeansException { handlerMap = context.getBeansWithAnnotation(RpcServiceAnnotation.class); Map<String,Object> controllerAnnotation = context.getBeansWithAnnotation(Controller.class); try { for(Object obj:controllerAnnotation.values()) { Class<?> clazz = obj.getClass(); Field[] fields = clazz.getFields(); for(Field f:fields) { boolean flag = f.isAnnotationPresent(Resource.class); if(flag) { Resource r = f.getAnnotation(Resource.class); String name = r.name(); if("".equals(name)) { name = f.getName(); } f.setAccessible(true); Object proxyObject = RpcProxy.newInstance(f.getClass()); f.set(obj, proxyObject); } } } } catch (SecurityException e) { e.printStackTrace(); } catch (IllegalArgumentException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } } }
2、RpcService处理handler
public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private Map<String,Object> handlerMap; public RpcHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest req) throws Exception { Object handlerObject = handlerMap.get(req.getBeanName()); Class handlerObjectClass = handlerObject.getClass(); Method method = handlerObjectClass.getMethod(req.getMethodName(), req.getParamTypes()); Object result = method.invoke(handlerObject, req.getParamArgs()); RpcResponse response = new RpcResponse(); response.setResult(result); ctx.writeAndFlush(response); } }
3、RpcDecoder,channel解码
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> decodeClass; public RpcDecoder(Class<?> decodeClass) { this.decodeClass = decodeClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //解码 if(in.readableBytes()>0) { byte[] byteArray = new byte[in.readInt()]; in.readBytes(byteArray); Object decodeObject = SerializationUtil.deserialize(byteArray, decodeClass); out.add(decodeObject); } } }
4、RpcEncode,发送数据转byte
public class RpcEncoder extends MessageToByteEncoder { private Class<?> encodeClass; public RpcEncoder(Class<?> encodeClass) { this.encodeClass = encodeClass; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { //编码 if(encodeClass.isInstance(msg)) { byte[] msgByte = SerializationUtil.serialize(msg); out.setInt(0, msgByte.length); out.readBytes(msgByte); } } }
5、RpcRequest,请求
public class RpcRequest implements Serializable { private static final long serialVersionUID = 1L; private String methodName; private String beanName; private Object[] paramArgs; private Class<?>[] paramTypes; public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public String getBeanName() { return beanName; } public void setBeanName(String beanName) { this.beanName = beanName; } public Object[] getParamArgs() { return paramArgs; } public void setParamArgs(Object[] paramArgs) { this.paramArgs = paramArgs; } public Class<?>[] getParamTypes() { return paramTypes; } public void setParamTypes(Class<?>[] paramTypes) { this.paramTypes = paramTypes; } }
6、RpcResponse
public class RpcResponse { public Object result; public Object getResult() { return result; } public void setResult(Object result) { this.result = result; } }
7、RpcServiceAnnotation用于提供service的类
/** * RPC接口注解 */ @Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcServiceAnnotation { public String value(); }
8、BizProxy,用于Controller类属性注解
/** * 用于Controller类下面的属性注解 */ @Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Component public @interface BizProxy { public String value(); }
9、SerializationUtil
public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<>(); private static Objenesis objenesis = new ObjenesisStd(true); private SerializationUtil() { } @SuppressWarnings("unchecked") private static <T> Schema<T> getSchema(Class<T> cls) { Schema<T> schema = (Schema<T>) cachedSchema.get(cls); if (schema == null) { schema = RuntimeSchema.createFrom(cls); if (schema != null) { cachedSchema.put(cls, schema); } } return schema; } @SuppressWarnings("unchecked") public static <T> byte[] serialize(T obj) { Class<T> cls = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(cls); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } public static <T> T deserialize(byte[] data, Class<T> cls) { try { T message = (T) objenesis.newInstance(cls); Schema<T> schema = getSchema(cls); ProtostuffIOUtil.mergeFrom(data, message, schema); return message; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
10、ZookeeperRegister,注册ZooKeeper
@Component("zk") public class ZookeeperRegister implements Watcher { private String registerAddress; private String childService = "nettyServiceIp"; private CountDownLatch countDownLatch; private ZooKeeper zk; public ZookeeperRegister(String registerAddress) { this.registerAddress = registerAddress; connect(); } public void connect() { try { countDownLatch = new CountDownLatch(1); zk = new ZooKeeper(registerAddress, ZookeeperContant.SESSION_TIME_OUT, this); countDownLatch.await(); Stat stat = zk.exists(ZookeeperContant.ROOT, false); if(stat==null) { zk.create(ZookeeperContant.ROOT, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } catch (Exception e) { e.printStackTrace(); } } public void register(String serviceAddress) { try { //创建子节点 zk.create(ZookeeperContant.ROOT+"/"+childService, serviceAddress.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); } catch (Exception e) { e.printStackTrace(); } } @Override public void process(WatchedEvent arg0) { if (arg0.getState() == Event.KeeperState.SyncConnected) { countDownLatch.countDown(); } } }
11、ZookeeperContant
public interface ZookeeperContant { public String CHILD_SERVICE = "nettyServiceIp"; public String ROOT = "netty"; public int SESSION_TIME_OUT = 2000; }
以下是客户端的类,用于获取服务地址和发送请求
12、RpcClient,发送请求
public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { private ServiceDiscovery serviceDiscovery; private RpcResponse response; private Object object = new Object(); public RpcClient() { } public RpcClient(ServiceDiscovery serviceDiscovery) { this.serviceDiscovery = serviceDiscovery; } public RpcResponse send(RpcRequest req) { EventLoopGroup bossGroup = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(bossGroup).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline() .addLast(new RpcDecoder(RpcRequest.class)) // 将 RPC 请求进行解码(为了处理请求) .addLast(new RpcEncoder(RpcResponse.class)) // 将 RPC 响应进行编码(为了返回响应) .addLast(RpcClient.this); // 处理 RPC 请求 } }) .option(ChannelOption.SO_KEEPALIVE, true); String addressIp = serviceDiscovery.discovery(); String[] ipArray = addressIp.split(":"); //发送数据 ChannelFuture future = bootstrap.connect(ipArray[0], Integer.parseInt(ipArray[1])).sync(); future.channel().writeAndFlush(req); //等待返回值 synchronized (object) { object.wait(); } } catch (Exception e) { e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); } return response; } @Override protected void channelRead0(ChannelHandlerContext ctx, RpcResponse res) throws Exception { response = res; synchronized (object) { object.notifyAll(); } } }
13、RpcProxy,初始化对象
public class RpcProxy { private Object target; public static Object newInstance(final Class<?> cls) { //RpcServiceAnnotation annotation = cls.getAnnotation(RpcServiceAnnotation.class); //final String beanName = annotation.value(); return Proxy.newProxyInstance(cls.getClassLoader(), cls.getInterfaces(), new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest req = new RpcRequest(); req.setBeanName(cls.getName()); //需要处理 req.setMethodName(method.getName()); req.setParamArgs(args); req.setParamTypes(method.getParameterTypes()); RpcClient client = new RpcClient(); RpcResponse response = client.send(req); return response; } }); } }
14、ServiceDiscovery和ServiceDiscoveryImpl
public interface ServiceDiscovery { public String discovery(); }
@Component("serviceDiscovery") public class ServiceDiscoveryImpl implements ServiceDiscovery,Watcher { private String serviceDiscoveryAddress; private CountDownLatch latch = new CountDownLatch(1); private ZooKeeper zk; public ServiceDiscoveryImpl(String serviceDiscoveryAddress) { this.serviceDiscoveryAddress = serviceDiscoveryAddress; connect(); } public void connect() { try { zk = new ZooKeeper(serviceDiscoveryAddress,ZookeeperContant.SESSION_TIME_OUT,this); latch.await(); }catch(Exception e) { e.printStackTrace(); } } @Override public String discovery() { try { List<String> serviceIpList = new ArrayList<String>(); List<String> listAddress = zk.getChildren(ZookeeperContant.ROOT+"/"+ZookeeperContant.CHILD_SERVICE, false); for(String str:listAddress) { byte[] ipBytes = zk.getData(str, false, new Stat()); String address = new String(ipBytes); serviceIpList.add(address); } if(serviceIpList.size()==1) { return serviceIpList.get(0); } Random random = new Random(); String addressIp = serviceIpList.get(random.nextInt(serviceIpList.size())); return addressIp; } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } @Override public void process(WatchedEvent event) { if(event.getState().equals(Event.KeeperState.SyncConnected)) { latch.countDown(); } } }
15、BeanFactoryProcessTest,Scan,BeanFactoryBeanTest
@Component("beanFactoryProcess") public class BeanFactoryProcessTest implements ApplicationContextAware,BeanFactoryPostProcessor { private ApplicationContext applicationContext; @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { Scan san = new Scan((BeanDefinitionRegistry)beanFactory); san.setResourceLoader(applicationContext); san.scan("com.paic.egis.*"); } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
public class BeanFactoryBeanTest<T> implements FactoryBean<T> { private String innerClassName; public BeanFactoryBeanTest(String innerClassName) { this.innerClassName = innerClassName; } @Override public T getObject() throws Exception { final Class<?> className = Class.forName(innerClassName); if(className.isInterface()) { return (T)RpcProxy.newInstance(className); }else { Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(className); enhancer.setNamingPolicy((NamingPolicy) SpringNamingPolicy.INSTANCE); enhancer.setCallback(new MethodInterceptor() { @Override public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable { RpcRequest req = new RpcRequest(); req.setBeanName(className.getName()); //需要处理 req.setMethodName(method.getName()); req.setParamArgs(args); req.setParamTypes(method.getParameterTypes()); RpcClient client = new RpcClient(); RpcResponse response = client.send(req); return response; } }); return (T) enhancer.create(); } } @Override public Class<?> getObjectType() { try { return Class.forName(innerClassName); } catch (ClassNotFoundException e) { e.printStackTrace(); } return null; } @Override public boolean isSingleton() { return true; } }
public class Scan extends ClassPathBeanDefinitionScanner { public Scan(BeanDefinitionRegistry registry) { super(registry); } @Override protected Set<BeanDefinitionHolder> doScan(String... basePackages) { Set<BeanDefinitionHolder> definitionHolders = super.doScan(basePackages); for(BeanDefinitionHolder holder:definitionHolders) { GenericBeanDefinition bean = (GenericBeanDefinition)holder.getBeanDefinition(); bean.getPropertyValues().add("innerClass", bean.getBeanClassName()); bean.setBeanClass(BeanFactoryBeanTest.class); } return definitionHolders; } @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { return super.isCandidateComponent(beanDefinition) && beanDefinition.getMetadata() .hasAnnotation(RpcServiceAnnotation.class.getName()); //return super.isCandidateComponent(beanDefinition); } }
相关推荐
【TCP调试助手(基于Netty)】是一款专为Windows平台设计的TCP协议调试软件,它利用了Netty这一高性能、异步事件驱动的网络应用框架。Netty是由Java编写的,广泛应用于各种分布式系统、服务器和客户端之间的通信,尤其...
基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式即时聊天系统.zip基于springcloud+Netty+MQ+mysql的分布式...
本文将深入探讨如何使用Java的Netty框架实现一个基于DTU(Data Transfer Unit)的TCP服务器,该服务器具备多端口通信和多协议解析的能力。 首先,DTU是一种专门用于远程数据传输的设备,它能够通过GPRS、3G/4G等...
"基于Netty+kafka的物联网数据采集脚手架"是一个专门针对这一需求设计的解决方案。Netty是一个高性能、异步事件驱动的网络应用程序框架,而Kafka是一款分布式流处理平台,两者结合可以构建出高效稳定的数据采集系统...
Netty的核心设计理念是基于Reactor模式,这是一种事件驱动的设计模式,它使得Netty能够高效地处理并发连接。在Netty中,EventLoop(事件循环)负责处理I/O事件,而Channel(通道)则代表网络连接,它们是进行数据...
在这个“netty demo 在线订票”项目中,我们将探讨如何利用Netty来模拟实现一个多线程的在线订票系统,以此来学习Netty的核心技术和Java编程。 在Java世界中,Netty因其高效、灵活以及丰富的特性而被广泛应用在各种...
Netty私协议是一种基于Java的高性能网络通信框架——Netty实现的自定义通信协议。Netty是一个异步事件驱动的网络应用框架,适用于快速开发可维护的高性能协议服务器和客户端。在众多分布式系统、微服务架构以及游戏...
Netty服务端和客户端调用demo是一个基于Spring Boot和Maven构建的应用,它演示了如何使用Netty框架进行网络通信。Netty是一个高性能、异步事件驱动的网络应用框架,适用于开发服务器和客户端的Java应用。这个demo...
Dubbo是由阿里巴巴开源的一个高性能Java RPC框架,它基于Netty进行通信,提供了高效的远程调用能力。在微服务架构中,Dubbo可以将不同的微服务通过RPC的方式进行通信和调用,它可以集成SpringBoot进行开发,使得...
对于基于Java序列化的Netty应用,单元测试和集成测试至关重要,确保数据的正确序列化和反序列化。可以使用如Mockito和WireMock等工具模拟服务端和客户端交互,验证数据传输的正确性。 综上所述,Netty服务端和...
在Java实现的这个kv数据库中,开发者可能会采用多线程和网络通信技术来模拟分布式环境。每个节点都会有一个独立的线程来处理其角色,如监听网络通信、处理选举和日志同步。Raft的核心机制包括选举逻辑、日志复制和...
在这个“Netty实例-群聊系统”中,我们将探讨如何使用Netty实现一个简单的TCP群聊系统。TCP(Transmission Control Protocol)是一种面向连接的、可靠的、基于字节流的传输层通信协议,它确保了数据的有序和无损传输...
在本文中,我们将探讨基于Socket的RPC实现以及后续的改进计划,包括升级到Netty作为底层通信库,以及引入Zookeeper作为注册中心。 首先,让我们深入了解基于Socket的RPC工作原理。在客户端,我们定义了一个接口,...
**标题解析:** "Netty-Akka-Template" ...这个项目为学习者提供了一个实践平台,通过它,开发者可以深入理解如何利用Netty处理网络连接,以及如何借助Akka处理并发和分布式系统的问题,同时掌握WebSocket的实现细节。
基于Web的聊天系统-模拟QQ的基本功能 ( springboot, netty, jpa, mybatis, mysql, redis, vue ).所有源码均经过严格测试,可以直接运行,可以放心下载使用。有任何使用问题欢迎随时与博主沟通,第一时间进行解答!该...
【Java版仿QQ即时通讯系统】是一个基于Java技术构建的实时通信平台,旨在模拟QQ的聊天功能。在Java编程环境中,MyEclipse被用作集成开发环境(IDE),提供了便捷的代码编写、调试和管理工具,使得开发过程更加高效。...
系统利用ADB和WDA工具对Android和iOS设备进行统一管理,通过MiniCap快速获取设备屏幕截图,借助Netty框架实现实时的Web端与设备间的TCP通信,确保界面同步。此外,系统利用SIFT(尺度不变特征变换)和OCR(光学字符...
特征: 高性能:系统采用完全异步方式实现,并发使用actor模型(akka),网络IO使用NIO(netty)。 no SPOF(单点故障):消息路由算法基于一致性哈希设计,集群中没有主节点。 #表现##实验服务器集群:2 个具有...
- Dubbo采用基于Netty的异步通讯模型,提高了服务调用的性能。 8. **监控与治理** - Dubbo提供了监控中心,可以统计服务的调用次数、调用时间、失败率等指标,帮助我们监控和管理服务。 - 可以通过`...
在这个例子中,框架是基于Netty实现的,Netty是一个高效的异步事件驱动的网络应用程序框架,常用于创建高并发、高性能的服务器和客户端应用。Netty提供了低级别的网络操作API,使得我们能够方便地构建RPC通信。 在...