`
cywhoyi
  • 浏览: 423201 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

异步调度设计

阅读更多

平常我们做应用功能的时候,经常会碰到A、B、C等多起事件取数据,进行组装后反馈结果集。

 

//用户信息
userService.getProfile(userId);
//用户信用信息
accountService.getAccount(userId);
//SNS
snsService.getSNS(userId);
//send email\sms
platformService.sendMessage(userId,message)

 

 综上所述,我们能够观察得到其实上述4次事件完全可以并行处理,现有的方式我们的执行计划是采纳串行的方式



 

 明显能够感觉得到A+B+C+D所消耗的时间才是这个method方法执行所消耗的时间,那如果采纳为并行的方式进行调用呢?

串行计划耗时:1+1+3+1=6

并行计划耗时:3



 

底层最为重要的一步,通过代理的方式,把类中的method都进行future调用



 

采用的是阿里的异步调用组件,但是代码有些老我进行了微调优,但是核心的内容主要还是proxy的方式,代码挺丑的,但是蛮有趣的方式进行异步调用。

考虑到并不是每个逻辑处理都需要异步的方式,

spring配置

 

<bean id="autoProxyCreator"
		  class="spring.lifecycle.BeanNameAutoProxyCreator">
		<property name="beanNames">
			<list>

				<value>*Service</value>
				<value>*DAO</value>
			</list>
		</property>
		<property name="interceptorFilter">
			<map>
				<entry key="*Service" value="transactionInterceptor,paramterInterceptor,monitorInterceptor"/>
			</map>
		</property>
	</bean>
 我在AbstractAutoProxyCreator中新增的一属性值
  private Map<String, String> interceptorFilter = new HashMap<String, String>();
 然后进行拦截工作,指定不同的Service被拦截不同的拦截器
 protected Advisor[] buildAdvisors(String beanName, Object[] specificInterceptors) {
        Iterator itor = interceptorFilter.entrySet().iterator();
        boolean ismatch = false;
        while (itor.hasNext()) {
            Map.Entry<String, String> entry = (Map.Entry<String, String>) itor.next();
            String key = entry.getKey();
            String values = entry.getValue();
            if (key.contains("*")) {
                key = key.replace("*", "");
                Pattern pattern = Pattern.compile(".*(" + key + "$)");
                Matcher matcher = pattern.matcher(beanName);
                ismatch = matcher.matches();
            } else {
                ismatch = key.matches(beanName);
            }
            if (ismatch) {
                this.interceptorNames = values.split(",");
            }
        }
...
 

 

前几天阿里的一位问起如何有没有考虑异步校验工作,一开始时候没想到,但是回过头想了下异步校验可能有其特殊的场合需要,正常情况下应该只需要顺序校验field。所以依托于其组件上层,写了一个异步注解的校验方式

核心代码如下

 

 public void validateFieldMethod(final Object o) throws Exception {
        Field[] fields = o.getClass().getDeclaredFields();
        for (final Field field : fields) {
            if (field.getAnnotation(ValidationDriver.class) != null) {
                field.setAccessible(true);
                Future future = executor.submit(new AsyncLoadCallable() {
                    public Object call() throws Exception {
                        try {
                            return AnnotationValidator.validate(field.get(o));
                        } catch (Throwable e) {
                            throw new AsyncLoadException("future invoke error!", e);
                        }
                    }

                    public AsyncLoadConfig getConfig() {
                        return config;
                    }
                });

            }
        }
    }

 

 

 

public class AsyncLoadEnhanceProxy<T> implements AsyncLoadProxy<T> {

    private T                 service;
    private AsyncLoadConfig   config;
    private AsyncLoadExecutor executor;
    private Class<T>          targetClass;

    public AsyncLoadEnhanceProxy(){
    }

    public AsyncLoadEnhanceProxy(T service, AsyncLoadExecutor executor){
        this(service, new AsyncLoadConfig(), executor);
    }

    public AsyncLoadEnhanceProxy(T service, AsyncLoadConfig config, AsyncLoadExecutor executor){
        this.service = service;
        this.config = config;
        this.executor = executor;
        this.targetClass = (Class<T>) service.getClass();// 默认的代理class对象即为service
    }

    public T getProxy() {
        validate();
        return getProxyInternal();
    }

    /**
     * 相应的检查方法
     */
    private void validate() {
        AsyncLoadUtils.notNull(service, "service should not be null");
        AsyncLoadUtils.notNull(config, "config should not be null");
        AsyncLoadUtils.notNull(executor, "executor should not be null");

        if (Modifier.isFinal(targetClass.getModifiers())) { // 目前暂不支持final类型的处理,以后可以考虑使用jdk
                                                            // proxy
            throw new AsyncLoadException("Enhance proxy not support final class :" + targetClass.getName());
        }

        if (!Modifier.isPublic(targetClass.getModifiers())) {
            // 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问
            throw new AsyncLoadException("Enhance proxy not support private/protected class :" + targetClass.getName());
        }
    }


    /**
     * 异步校验
     * @param o
     * @throws Exception
     */
    public void validateFieldMethod(final Object o) throws Exception {
        Field[] fields = o.getClass().getDeclaredFields();
        for (final Field field : fields) {
            if (field.getAnnotation(ValidationDriver.class) != null) {
                field.setAccessible(true);
                Future future = executor.submit(new AsyncLoadCallable() {
                    public Object call() throws Exception {
                        try {
                            return AnnotationValidator.validate(field.get(o));
                        } catch (Throwable e) {
                            throw new AsyncLoadException("future invoke error!", e);
                        }
                    }

                    public AsyncLoadConfig getConfig() {
                        return config;
                    }
                });

            }
        }
    }
    class AsyncLoadCallbackFilter implements CallbackFilter {

        public int accept(Method method) {
            // 预先进行匹配,直接计算好需要处理的method,避免动态匹配浪费性能
            if (AsyncLoadObject.class.isAssignableFrom(method.getDeclaringClass())) {// 判断对应的方法是否属于AsyncLoadObject
                return 0; // for AsyncLoadServiceInterceptor
            } else {
                Map<AsyncLoadMethodMatch, Long> matches = config.getMatches();
                Set<AsyncLoadMethodMatch> methodMatchs = matches.keySet();
                if (methodMatchs != null && !methodMatchs.isEmpty()) {
                    for (Iterator<AsyncLoadMethodMatch> methodMatch = methodMatchs.iterator(); methodMatch.hasNext();) {
                        if (methodMatch.next().matches(method)) {
                            return 2; // for AsyncLoadInterceptor
                        }
                    }
                }
                return 1; // for AsyncLoadDirect
            }
        }
    }

    class AsyncLoadServiceInterceptor implements MethodInterceptor {

        public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
            if ("_getOriginalClass".equals(method.getName())) {
                return getOriginalClass();
            }
            throw new AsyncLoadException("method[" + method.getName() + "] is not support!");
        }

        private Object getOriginalClass() {
            return targetClass;
        }
    }

    class AsyncLoadDirect implements Dispatcher {

        public Object loadObject() throws Exception {
            return service;
        }

    }

    class AsyncLoadInterceptor implements MethodInterceptor {

        public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
            Long timeout = getMatchTimeout(method);
            final Object finObj = service;
            final Object[] finArgs = args;
            final Method finMethod = method;

            Class returnClass = method.getReturnType();
            if (Void.TYPE.isAssignableFrom(returnClass)) {// 判断返回值是否为void
                // 不处理void的函数调用
                return finMethod.invoke(finObj, finArgs);
            } else if (!Modifier.isPublic(returnClass.getModifiers())) {
                // 处理如果是非public属性,则不进行代理,强制访问会出现IllegalAccessException,比如一些内部类或者匿名类不允许直接访问
                return finMethod.invoke(finObj, finArgs);
            } else if (Modifier.isFinal(returnClass.getModifiers())) {
                // 处理特殊的final类型,目前暂不支持,后续可采用jdk proxy
                return finMethod.invoke(finObj, finArgs);
            } else if (returnClass.isPrimitive() || returnClass.isArray()) {
                // 不处理特殊类型,因为无法使用cglib代理
                return finMethod.invoke(finObj, finArgs);
            } else if (returnClass == Object.class) {
                // 针对返回对象是Object类型,不做代理。没有具体的method,代理没任何意义
                return finMethod.invoke(finObj, finArgs);
            } else {
                Future future = executor.submit(new AsyncLoadCallable() {

                    public Object call() throws Exception {
                        try {
                            return finMethod.invoke(finObj, finArgs);// 需要直接委托对应的finObj(service)进行处理
                        } catch (Throwable e) {
                            throw new AsyncLoadException("future invoke error!", e);
                        }
                    }

                    public AsyncLoadConfig getConfig() {
                        return config;
                    }
                });
                // 够造一个返回的AsyncLoadResult
                AsyncLoadResult result = new AsyncLoadResult(returnClass, future, timeout);
                // 继续返回一个代理对象
                AsyncLoadObject asyncProxy = (AsyncLoadObject) result.getProxy();
                // 添加到barrier中
                if (config.getNeedBarrierSupport()) {
                    AsyncLoadBarrier.addTask((AsyncLoadObject) asyncProxy);
                }
                // 返回对象
                return asyncProxy;
            }

        }

        /**
         * 返回对应的匹配的timeout时间,一定能找到对应的匹配点
         * 
         * @param method
         * @return
         */
        private Long getMatchTimeout(Method method) {
            Map<AsyncLoadMethodMatch, Long> matches = config.getMatches();
            Set<Map.Entry<AsyncLoadMethodMatch, Long>> entrys = matches.entrySet();
            if (entrys != null && !entrys.isEmpty()) {
                for (Iterator<Map.Entry<AsyncLoadMethodMatch, Long>> iter = entrys.iterator(); iter.hasNext();) {
                    Map.Entry<AsyncLoadMethodMatch, Long> entry = iter.next();
                    if (entry.getKey().matches(method)) {
                        return entry.getValue();
                    }
                }
            }

            return config.getDefaultTimeout();
        }
    }

    // =========================== help mehotd =================================

    /**
     * 优先从Repository进行获取ProxyClass,创建对应的object
     * 
     * @return
     */
    private T getProxyInternal() {
        Class proxyClass = AsyncLoadProxyRepository.getProxy(targetClass.getName());
        if (proxyClass == null) {
            Enhancer enhancer = new Enhancer();
            if (targetClass.isInterface()) { // 判断是否为接口,优先进行接口代理可以解决service为final
                enhancer.setInterfaces(new Class[] { targetClass });
            } else {
                enhancer.setSuperclass(targetClass);
            }
            enhancer.setCallbackTypes(new Class[] { AsyncLoadServiceInterceptor.class, AsyncLoadDirect.class,
                    AsyncLoadInterceptor.class });
            enhancer.setCallbackFilter(new AsyncLoadCallbackFilter());
            proxyClass = enhancer.createClass();
            // 注册proxyClass
            AsyncLoadProxyRepository.registerProxy(targetClass.getName(), proxyClass);
        }

        Enhancer.registerCallbacks(proxyClass, new Callback[] { new AsyncLoadServiceInterceptor(),
                new AsyncLoadDirect(), new AsyncLoadInterceptor() });
        try {
            return (T) AsyncLoadReflectionHelper.newInstance(proxyClass);
        } finally {
            // clear thread callbacks to allow them to be gc'd
            Enhancer.registerStaticCallbacks(proxyClass, null);
        }
    }

    // ====================== setter / getter ===========================

    public void setService(T service) {
        this.service = service;
        if (targetClass == null) {
            this.targetClass = (Class<T>) service.getClass();
        }
    }

    public void setConfig(AsyncLoadConfig config) {
        this.config = config;
    }

    public void setExecutor(AsyncLoadExecutor executor) {
        this.executor = executor;
    }

    public void setTargetClass(Class targetClass) {
        this.targetClass = targetClass;
    }

}

 

 

 

  • 大小: 27.6 KB
  • 大小: 9 KB
  • 大小: 49.2 KB
1
1
分享到:
评论
11 楼 LinApex 2015-09-22  
cywhoyi 写道
LinApex 写道
cywhoyi 写道
LinApex 写道
有全部代码不、想研究下?

核心代码就这点,我这周放到blog上

好哒

坑爹呢,这档子事情忘记去整理了

不会吧~~~ 坐等~
10 楼 cywhoyi 2015-09-21  
LinApex 写道
cywhoyi 写道
LinApex 写道
有全部代码不、想研究下?

核心代码就这点,我这周放到blog上

好哒

坑爹呢,这档子事情忘记去整理了
9 楼 LinApex 2015-09-16  
cywhoyi 写道
LinApex 写道
有全部代码不、想研究下?

核心代码就这点,我这周放到blog上

好哒
8 楼 cywhoyi 2015-09-16  
LinApex 写道
有全部代码不、想研究下?

核心代码就这点,我这周放到blog上
7 楼 LinApex 2015-09-16  
有全部代码不、想研究下?
6 楼 cywhoyi 2015-09-14  
LinApex 写道
小心
cywhoyi 写道
LinApex 写道
cywhoyi 写道
LinApex 写道
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

效率是必然啦,事务方面因为是使用动态期的CGLIB代理的方式


效率肯定会提高? 有用到过正式环境吗?

为了验证正式环境,我们上线了项目,至今一周未出问题,但不能说以后不会出现问题

小心,有空我也试试研究下,多线程并行,但客户端多个请求可能就出现问题了哦

谢谢,我们还算比较严谨的
5 楼 LinApex 2015-09-01  
小心
cywhoyi 写道
LinApex 写道
cywhoyi 写道
LinApex 写道
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

效率是必然啦,事务方面因为是使用动态期的CGLIB代理的方式


效率肯定会提高? 有用到过正式环境吗?

为了验证正式环境,我们上线了项目,至今一周未出问题,但不能说以后不会出现问题

小心,有空我也试试研究下,多线程并行,但客户端多个请求可能就出现问题了哦
4 楼 cywhoyi 2015-09-01  
LinApex 写道
cywhoyi 写道
LinApex 写道
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

效率是必然啦,事务方面因为是使用动态期的CGLIB代理的方式


效率肯定会提高? 有用到过正式环境吗?

为了验证正式环境,我们上线了项目,至今一周未出问题,但不能说以后不会出现问题
3 楼 LinApex 2015-08-12  
cywhoyi 写道
LinApex 写道
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

效率是必然啦,事务方面因为是使用动态期的CGLIB代理的方式


效率肯定会提高? 有用到过正式环境吗?
2 楼 cywhoyi 2015-08-07  
LinApex 写道
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

效率是必然啦,事务方面因为是使用动态期的CGLIB代理的方式
1 楼 LinApex 2015-08-07  
效率如何?  有失败环节怎么办? 事务控制呢有没有问题??

相关推荐

    人工智能-项目实践-异步调度-异步调度IP免费代理池.zip

    人工智能-项目实践-异步调度-异步调度IP免费代理池 本项目通过爬虫抓取互联网上免费代理网站的IP,并且进行异步检测是否可用,如果可用就放入数据库。定时对数据库中的代理进行维护,然后通过web api的形式供外部...

    大规模虚拟地形数据多线程异步调度算法.pdf

    在这样的背景下,任子健和陈璐的研究成果《大规模虚拟地形数据多线程异步调度算法》为解决该问题提供了新的视角和思路。 文章开篇即阐述了目前虚拟地形环境仿真研究的现状与挑战。在处理较小范围的地形数据时,现有...

    C#操作系统中的进程调度的设计与实现

    本文将深入探讨C#操作系统中的进程调度设计与实现。 首先,我们需要明确的是,C#本身并不直接实现操作系统内核或进程调度,它是.NET Framework的一部分,运行在Windows、Linux、macOS等操作系统之上。然而,开发者...

    网络游戏-基于数据命名为中心的无线传感器网络异步休眠调度方法.zip

    总的来说,这份资料可能详细介绍了如何设计和实现一种结合数据命名和异步休眠调度的方法,以提升网络游戏中的无线传感器网络性能,同时兼顾能源效率。通过这种方式,可以构建更加智能、节能且响应快速的游戏环境,为...

    Go-Seelog是一个原生Go日志库提供了灵活的异步调度过滤和格式化

    首先,让我们深入了解一下Seelog的异步调度机制。在传统的日志处理方式中,日志记录通常会阻塞主线程,影响程序执行效率。而Seelog通过引入异步模式,可以在后台线程处理日志写入操作,避免了这一问题。这意味着即使...

    数据库车辆调度课程(毕业)设计

    数据库车辆调度课程设计是一项综合性的项目,涉及到多个关键领域的知识,包括数据库管理、车辆调度算法以及Web应用开发。以下是对这些关键知识点的详细说明: 1. **数据库管理**:数据库是存储和管理数据的核心工具...

    基于SEDA的异步框架设计与实现

    **基于SEDA的异步框架设计与实现** SEDA(Staged Event-Driven Architecture)是一种软件架构模式,常用于高性能、高并发的应用程序设计。它将系统分解为多个独立的阶段,每个阶段处理特定类型的事件,并通过队列...

    基于工作流物流调度系统的设计与实现

    ### 基于工作流物流调度系统的设计与实现 #### 一、引言 随着信息技术的发展,企业规模不断扩大,信息资源以惊人的速度增长,这促使企业需要建立一个高效的、能够适应异构分布式环境的信息管理系统。工作流技术...

    异步scoket 通讯 源码。(C#服务端 和客户端源码)

    为了提高性能,可以使用多个Socket实例并行处理多个连接,或者使用线程池来调度异步操作。另外,合理设置缓冲区大小,避免频繁的小规模数据传输,也是优化的关键。 8. **示例代码** 服务端示例: ```csharp ...

    基于Django+Celery队列实现的集中化异步任务调度系统,

    本项目“基于Django+Celery队列实现的集中化异步任务调度系统”旨在提供一个易于集成的框架,帮助开发者快速在Django项目中实现异步任务处理,降低业务代码的复杂性。 Django是Python领域广泛使用的Web框架,以其...

    基于Spring打造简单高效通用的异步任务处理系统

    总结来说,构建基于Spring的异步任务处理系统是一个综合了数据库设计、任务调度、并发执行和故障恢复的过程。通过合理的设计和配置,我们可以实现一个既高效又易于维护的异步任务处理平台,满足复杂业务场景的需求,...

    论文研究-面向流体系的细粒度异步访存调度.pdf

    针对异步访存调度机制,设计一种细粒度化的调度方案以提升系统性能。该机制引入信号量和自旋锁,由异构核间协作运作,以实现对流级调度的局部加速。通过在一组测试程序集以及在对应平台上进行的实验,评估了引入该...

    基于UVM的异步fifo验证平台设计

    在本文中,我们将深入探讨如何基于UVM(Universal Verification Methodology)设计一个异步FIFO(First-In-First-Out)的验证平台。FIFO是一种常用的数据传输结构,它按照数据进入的顺序进行输出,而异步FIFO则涉及...

    基于Web的CAD异步协同设计的方案研究与实现.pdf

    为了应对这一挑战,研究并实现了基于Web的CAD异步协同设计方案,本文将详细探讨该方案的设计理念及实现方法,并阐述其应用系统中的具体实现。 首先,基于Web的CAD异步协同设计方案采用了B/S(Browser/Server)架构...

    调度绞车减速装置的运动仿真

    调度绞车减速装置的运动仿真是一项复杂的技术活动,它涉及到机械设计、虚拟装配、仿真分析等多方面内容。本次知识点梳理将围绕摆线针轮减速器、Pro/E软件、运动仿真等关键词展开。 首先,摆线针轮减速器是工业中...

    Twisted与异步编程入门

    2. **线程模型**:每个任务在一个独立的线程中执行,线程由操作系统调度。多线程允许并发执行,但线程间的通信和同步复杂,可能导致竞态条件和死锁等问题。 3. **异步模型**:在单线程中,任务交错执行,每个任务...

    网络游戏-在同一个网络上调度同步和异步分组的方法和系统.zip

    综上所述,网络游戏中的同步与异步调度是一项复杂的工程,需要综合考虑网络特性、游戏逻辑和玩家体验。通过对这些策略和技术的深入理解和应用,开发者能够创建出更加稳定、流畅的网络游戏环境。这份资料的详细分析和...

Global site tag (gtag.js) - Google Analytics