`

详解应对平台高并发的分布式调度框架TBSchedule

阅读更多
    
    

     tbschedule是一款非常优秀的高性能分布式调度框架,非常高兴能分享给大家。这篇文章是我结合多年tbschedule使用经验和研读三遍源码的基础上完成的。我写这篇文章的目的一是出于对tbschedule的一种热爱,二是现在是一个资源共享、技术共享的时代,希望把它展现给大家(送人玫瑰,手留余香),能给大家的工作带来帮助。

    一、tbschedule初识

        时下互联网和电商领域,各个平台都存在大数据、高并发的特点,对数据处理的要求越来越高,既要保证高效性,又要保证安全性、准确性。tbschedule的使命就是将调度作业从业务系统中分离出来,降低或者是消除和业务系统的耦合度,进行高效异步任务处理。其实在互联网和电商领域tbschedule的使用非常广泛,目前被应用于阿里巴巴、淘宝、支付宝、京东、聚美、汽车之家、国美等很多互联网企业的流程调度系统。
        在深入了解tbschedule之前我们先从内部和外部形态对它有个初步认识,如图1.1、图1.2。



        从tbschedule的内部形态来说,与他有关的关键词包括批量任务、动态扩展、多主机、多线程、并发、分片……,这些词看起来非常的高大上,都是时下互联网技术比较流行的词汇。从tbschedule的外部架构来看,一目了然,宿主在调度应用中与zookeeper进行通信。一个框架结构是否是优秀的,从美感的角度就可以看出来,一个好的架构一定是隐藏了内部复杂的原理,外部视觉上美好的,让用户使用起来简单易懂。 

    二、tbschedule原理
        为什么TBSchedule值得推广呢?

       传统的调度框架spring task、quartz也是可以进行集群调度作业的,一个节点挂了可以将任务漂移给其他节点执行从而避免单点故障,但是不支持分布式作业,一旦达到单机处理极限也会存在问题。
       elastic-job支持分布式,是一个很好的调度框架,但是开源时间较短,还没有经历大范围市场考验。
       Beanstalkd基于C语言开发,使用范围较小,无法引入到php、java系统平台。

        tbschedule到底有多强大呢?我对tbschedule的优势特点进行了如下总结:
        1、支持集群、分布式
        2、灵活的任务分片
        3、动态的服务扩容和资源回收
        4、任务监控支持
        tbschedule支持cluster,可以宿主在多台服务器多个线程组并行进行任务调度,或者说可以将一个大的任务拆成多个小任务分配到不同的服务器。tbschedule的分布式机制是通过灵活的sharding方式实现的,比如可以按所有数据的ID按10取模分片(分片规则如图2.1)、按月份分片等等,根据不同的需求,不同的场景由客户端配置分片规则。我们知道spring task、quarz也是可以进行集群调度作业的,一个节点挂了可以将任务漂移给其他节点执行从而避免单点故障,但是不支持分布式作业,一旦达到单机处理极限也会存在问题,这个就是tbschedule的优势。然后就是tbschedule的宿主服务器可以进行动态扩容和资源回收,这个特点主要是因为它后端依赖的zookeeper,这里的zookeeper对于tbschedule来说是一个nosql,用于存储数据,它的数据结构类似文件系统的目录结构,它的节点有临时节点、持久节点之分。调度引擎上线后,随着业务量数据量的增多,当前cluster可能不能满足目前的处理需求,那么就需要增加服务器数量,一个新的服务器上线后会在zk中创建一个代表当前服务器的一个唯一性路径(临时节点),并且新上线的服务器会和zk有长连接,当通信断开后,节点会自动摘除。tbschedule会定时扫描当前服务器的数量,重新分配任务。tbschedule不仅提供了服务端的高性能调度服务,还提供了一个scheduleConsole war随着宿主应用的部署直接部署到服务器,可以通过web的方式对调度的任务、策略进行监控,实时更新。



        是不是已经对tbschedule稍微了有些好感呢?我们接着往下看。
        tbschedule提供了两个核心组件ScheduleServer、TbscheduleManagerFactory和两类核心接口IScheduleTaskDeal、IScheduleTaskDealSingle、IScheduleTaskDealMuti,这两部分是客户端研发的关键部分,是使用tbschedule必须要了解的。
        ScheduleServer即任务处理器,的主要作用是任务和策略的管理、任务采集和执行,由一组工作线程组成,这组工作线程是基于队列实现的,进行任务抓取和任务处理(two mode)。每个任务处理器和zookeeper有一个心跳通信连接,用于检测server的状态和进行任务动态分配,举个例子,比如3服务器的worker集群执行出票消息生成任务,这对这个任务类型每台服务器可以配置一个ScheduleSever(即一个线程组),也可以配置两个线程组,相当于6台服务器在执行这个任务。当某台服务器宕机或者其他原因与zk通信断开时,他的任务将被其他服务器接管。ScheduleServer参数定义如图2.2



        在这些参数中taskItems是一个非常重要的属性,是客户单可以自由发挥的地方,是任务分片的基础,比如我们处理一个任务可以根据ID按10取模,那么任务项就是0-9,3台服务器分别拿到4、 3、 3个任务项,服务器的上下线都会对任务项进行重新分配。任务项是进行任务分配的最小单位。一个任务项只能由一个ScheduleServer来进行处理,但一个Server可以处理任意数量的任务项。这就是刚才我们说的分片特性。
        调度服务器TbscheduleManagerFactory的主要工作zookeeper连接参数配置和zookeeper的初始化、调度管理。
        两类核心接口是需要被我们定义的目标任务实现的,根据自己的需要进行任务采集(重写selectTasks方法)和任务执行(重写execute方法),这类接口是客户端研发自由发挥的地方。
        接下来我们深入了解下tbschedule,看看它的内部是如何实现的。下面流程图是我花了很多心血通过一周时间画出来的,基本是清晰的展现了tbschedule内部的执行流程以及每个步骤zookeer节点路径和数据的变化。因为图中的注释已经描述的很详细了,每个节点右侧是zk的信息(数据结构见图2.3),这里就不再做过多的文字描述了,有任何建议或者不明白的地方可以找我交流。










        Tbschedule还有个强大之处是它提供了两种处理器模式模式:
        1、SLEEP模式
        当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中,同时唤醒其它休眠线程开始工作。
        2、NOTSLEEP模式
        当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。
        SLEEP模式内部逻辑相对较简单,如果遇到大任务需要处理较长时间,可能会造成其他线程被动阻塞的情况。但其实生产环境一般都是小而快的任务,即使出现阻塞的情况ScheduleConsole也会及时的监控到。NOTSLEEP模式减少了线程休眠的时间,避免了因大任务造成阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。TBSchedule默认是SLEEP模式。

        到目前为止我相信大家对tbschedule有了一个深刻的了解,心中的疑雾逐渐散开了。理论是实践的基础,实践才是最终的目的,下一节我们将结合理论知识进行tbschedule实战.

    三、tbschedule实战

        在项目中使用tbschedule需要依赖zookeeper、tbschedule
        zookeeper依赖:

   
<dependency> 		  
<groupId>org.apache.zookeeper</groupId
<artifactId>zookeeper</artifactId> 		
<version>3.4.6</version> 	
</dependency>
    


       tbschedule依赖:

  
<dependency>
<groupId>com.taobao.pamirs.schedule</groupId
<artifactId>tbschedule</artifactId>
<version>3.3.3.2</version>
</dependency>
   


        tbschedule有三种引入方式:
        1、通过ScheduleConsole引入
        Tbschedule随着宿主调度应用部署到服务器后,可以通过web浏览器的方式访问其提供监控平台。
        第一步,初始化zookeeper



        第二步,创建调度策略



        第三步,创建调度任务



        第四步,监控调度任务



        2、通过原生java引入

    
        // 初始化Spring
        ApplicationContext ctx = new FileSystemXmlApplicationContext(
                "spring-config.xml");

        // 初始化调度工厂
        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();

        Properties p = new Properties();
        p.put("zkConnectString", "127.0.0.1:2181");
        p.put("rootPath", "/taobao-schedule/train_worker");
        p.put("zkSessionTimeout", "60000");	
        p.put("userName", "train_dev");
        p.put("password", " train_dev ");
        p.put("isCheckParentPath", "true");

        scheduleManagerFactory.setApplicationContext(ctx);
        
        scheduleManagerFactory.init(p); 

       // 创建任务调度任务的基本信息
        String baseTaskTypeName = "DemoTask";
	ScheduleTaskType baseTaskType = new ScheduleTaskType();
        baseTaskType.setBaseTaskType(baseTaskTypeName);
        baseTaskType.setDealBeanName("demoTaskBean");
        baseTaskType.setHeartBeatRate(10000);
        baseTaskType.setJudgeDeadInterval(100000);
        baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");  baseTaskType.setTaskItems(ScheduleTaskType.splitTaskItem(
				"0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4}," +
				"4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8}," +
				"8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
	baseTaskType.setFetchDataNumber(500);
	baseTaskType.setThreadNumber(5);
	this.scheduleManagerFactory.getScheduleDataManager()
		.createBaseTaskType(baseTaskType);
	log.info("创建调度任务成功:" + baseTaskType.toString());

	// 创建任务的调度策略
	String taskName = baseTaskTypeName;
	String strategyName =taskName +"-Strategy";
	try {
		this.scheduleManagerFactory.getScheduleStrategyManager()
				.deleteMachineStrategy(strategyName,true);
	} catch (Exception e) {
		e.printStackTrace();
	}
	ScheduleStrategy strategy = new ScheduleStrategy();
	strategy.setStrategyName(strategyName);
	strategy.setKind(ScheduleStrategy.Kind.Schedule);
	strategy.setTaskName(taskName);
	strategy.setTaskParameter("china");
		
	strategy.setNumOfSingleServer(1);
	strategy.setAssignNum(10);
	strategy.setIPList("127.0.0.1".split(","));
	this.scheduleManagerFactory.getScheduleStrategyManager()
			.createScheduleStrategy(strategy);
	log.info("创建调度策略成功:" + strategy.toString());




        3、通过spring容器引入

<!-- 初始化zookeeper -->  
<bean id="scheduleManagerFactory"
		    class="com.xx.TBScheduleManagerFactory" init-method="init">
<property name="zkConfig">
<map>
	<entry key="zkConnectString" value="127.0.0.1:2181" />
	<entry key="rootPath" value="/taobao-schedule/train_worker" />
	<entry key="zkSessionTimeout" value="60000" />
	<entry key="userName" value="train_dev" />
	<entry key="password" value="train_dev" />
	<entry key="isCheckParentPath" value="true" />
</map>
</property>	
</bean>
<!-- 配置调度策略 凌晨1点到3点执行 -->
<bean id="abstractDemoScheduleTask" class="com.xx.core.tbschedule.InitMyScheduleTask" abstract="true">
<property name="scheduleTaskType.heartBeatRate" value="10000" />
<property name="scheduleTaskType.judgeDeadInterval" value="100000" />
<property name="scheduleTaskType.permitRunStartTime" value="0 0 1 * * ?"/> 
<property name="scheduleTaskType.permitRunEndTime" value="0 0 3 * * ?"/>  
<property name="scheduleTaskType.taskParameter" value="AREA=BJ,YEAR>30" />
<property name="scheduleTaskType.sleepTimeNoData" value="60000"/>
<property name="scheduleTaskType.sleepTimeInterval" value="60000"/>
<property name="scheduleTaskType.fetchDataNumber" value="500" />
<property name="scheduleTaskType.executeNumber" value="1" />
<property name="scheduleTaskType.threadNumber" value="5" />
<property name="scheduleTaskType.taskItems"> 
<list>
		<value>0:{TYPE=A,KIND=1}</value>
		<value>1:{TYPE=A,KIND=2}</value>
		<value>2:{TYPE=A,KIND=3}</value>
		<value>3:{TYPE=A,KIND=4}</value>
		<value>4:{TYPE=A,KIND=5}</value>
		<value>5:{TYPE=A,KIND=6}</value>
		<value>6:{TYPE=A,KIND=7}</value>
		<value>7:{TYPE=A,KIND=8}</value>
		<value>8:{TYPE=A,KIND=9}</value>
		<value>9:{TYPE=A,KIND=10}</value>
	</list>
</property>
<property name="scheduleStrategy.kind" value="Schedule" />
<property name="scheduleStrategy.numOfSingleServer" value="1" />
<property name="scheduleStrategy.assignNum" value="10" />	
	<property name="scheduleStrategy.iPList">
	    <list>
		    <value>127.0.0.1</value>
	    </list>
	</property>
	</bean>        
<!-- 配置调度任务 -->
<bean id="demoTask" class="com.xx.worker.task.DemoTask" parent="abstractDemoScheduleTask">
<property name="scheduleTaskType.baseTaskType" value="demoTask" />
<property name="scheduleTaskType.dealBeanName" value="demoTaskBean" />
<property name="scheduleStrategy.strategyName" value="demoTaskBean-Strategy" />
<property name="scheduleStrategy.taskName" value="demoTaskBean" />
</bean>	




    调度任务具体实现 DemoTask.java

   

 /**
 * DemoTask任务类
 */
public class DemoTask  mplements
		IScheduleTaskDealSingle,TScheduleTaskDeal {

 /**
  * 数据采集
  * @param taskItemNum--分配的任务项 taskItemList--总任务项 
  *        eachFetchDataNum--采集任务数量
  */
	@Override
	public List<DemoTask> selectTasks(String taskParameter,
			String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList,
			int eachFetchDataNum) throws Exception {
		List<DemoTask> taskList = new LinkedList<DemoTask>();
		//客户端根据条件进行数据采集start
		
		//客户端根据条件进行数据采集end
		return rt;
	}
/**
  * 数据处理
  */
	@Override
	public boolean execute(DemoTask task, String ownSign)
			throws Exception {
 		//客户端pop任务进行处理start
		
		//客户端pop任务进行处理end
		return true;
	}
}

  

        其实我们看对于tbscchedule客户端的使用非常简单,初始化zk、配置调度策略、调度任务,对调度任务进行实现,就这几个步骤。现在可以庆祝下了,你又掌握了一个优秀开源框架的设计思想和使用方式。

    四、tbschedule挑战

        任何事物都是没有最好只有更好,tbschedule也一样,虽然它现在已经很完美了,我们不能放弃对更完美的追求。阿里团队可以在下面几个方面进行优化。
        1、ScheduleConsole监控页面优化,目前ScheduleConsole监控页面过于简单,需完善UI设计,提高用户体验。
        2、Zookeeper集群自动切换,避免zk服务的集群点多故障
        3、原生zookeeper操作替换为curator,Curator对ZooKeeper进行了一次包装,对原生ZooKeeper的操作做了大量优化,Client和Server之间的连接可能出现的问题处理等等,可以进一步提高TBSchedule的高可用。
        4、帮助文档较少,网上的资料基本是千篇一律,希望有更多的爱好者加入进来。

   至此,我们已经完成了对tbschedule的全部介绍,尽快使用起来吧!

心灵鸡汤

       优秀的架构都很相似,垃圾的架构各有各的垃圾




我的博客:http://mycolababy.iteye.com/

【未经作者不得转载,转载请注明出处http://mycolababy.iteye.com】

     






   






  • 大小: 71.3 KB
  • 大小: 57.9 KB
  • 大小: 63.4 KB
  • 大小: 133.2 KB
  • 大小: 325.1 KB
  • 大小: 45.4 KB
  • 大小: 113 KB
  • 大小: 146.2 KB
  • 大小: 90 KB
  • 大小: 402 KB
  • 大小: 128.5 KB
  • 大小: 220.3 KB
  • 大小: 32.2 KB
分享到:
评论
2 楼 chengpeng_2015 2016-04-28  
 
1 楼 Tyrion 2016-04-27  
好文

相关推荐

    duboo分布式狂框架demo

    【Duboo分布式框架详解】 Duboo是一个基于Apache Dubbo构建的高性能、轻量级的Java分布式服务框架。它旨在提高微服务架构中的服务治理效率,包括服务注册与发现、负载均衡、容错处理、监控与日志等核心功能。本项目...

    Java-高并发分布式淘淘商城.zip

    《Java高并发分布式淘淘商城系统详解》 在数字化时代,电商系统的发展日新月异,其中,"淘淘商城"作为一个模拟真实电商平台的案例,其背后的技术架构尤其值得我们深入探讨。本篇将主要围绕Java高并发分布式系统展开...

    Hulk-高性能分布式事务框架(TCC模式基于SpringCloud)

    《Hulk:高性能分布式事务框架基于SpringCloud的TCC模式详解》 在现代企业级应用中,事务处理是核心功能之一,确保数据的一致性和完整性至关重要。Hulk是一款优秀的高性能分布式事务框架,它以TCC(Try-Confirm-...

    java毕业设计&课设-高并发分布式淘淘商城(视频+源码+资料).doc

    ### Java毕业设计&课设-高并发分布式淘淘商城...总之,这个Java高并发分布式淘淘商城项目不仅能够帮助学生掌握一系列先进的Java开发技术和框架,还能够在实践中培养解决问题的能力,为未来的职业生涯打下坚实的基础。

    TBSchedule淘宝开源定时任务调度框架客户端demo

    1. **核心功能**:TBSchedule主要负责任务的调度与执行,支持分布式环境,可以处理大量并发的定时任务,具备高可用性和容错性。 2. **设计原则**:采用中心式调度策略,客户端注册任务到调度中心,由调度中心统一...

    基于分布式调度架构的录波主站系统设计.pdf

    本文主要讲述了基于分布式调度架构的录波主站系统设计,并从其架构优化设计、面临的挑战、性能提升等方面进行了深入探讨。以下为文中提到的知识点详解: 1. 录波主站系统的重要性:录波主站系统是确保电力系统安全...

    Quartz开源高性能作业调度框架详解与集成案例

    内容概要:本文档全面介绍了Quartz,一款开源的高性能作业调度框架。它不仅详细解析了Quartz的核心组件及其任务存储方式,还分享了Spring Boot环境下集成Quartz的具体步骤以及在多点集群环境中使用数据库存储的任务...

    CsGo并发流程控制框架

    CsGo并发流程控制框架结合了C#语言的特性和.NET Framework的强大支持,为开发者提供了丰富的工具和策略来应对高并发环境的挑战。通过对`ConsoleTest`项目的分析,我们可以更好地理解如何在实际开发中应用这些并发...

    分布式自动化测试平台

    ### 分布式自动化测试平台构建知识点 #### 一、分布式自动化测试平台概述 分布式自动化测试平台是一种能够跨多台计算机并行执行自动化测试任务的技术体系。这种架构的主要优点在于能够提高测试效率,减少单点故障...

    Python并行分布式框架Celery详解

    Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。 在 Python 中定义 Celery 的...

    Python-CharmPy是一个通用的并行和分布式编程框架

    **Python-CharmPy框架详解** Python-CharmPy是Python编程领域的一个强大工具,它为开发者提供了通用的并行和分布式编程解决方案。这个框架的核心优势在于其简单易用的API,它允许用户通过可迁移的Python对象和远程...

    基于tcc的分布式事务框架源码.zip

    Hmily是一个高性能、轻量级的Java TCC分布式事务框架。它的核心特性包括: 1. **灵活的TCC模式**:Hmily允许开发者自定义Try、Confirm和Cancel方法,适应各种复杂的业务场景。 2. **高并发处理**:设计时考虑了高...

    The Definitive Guide to Terracotta 分布式JVM框架

    ### 《Terracotta 分布式JVM框架权威指南》核心知识点详解 #### 一、Terracotta概述 - **定义与背景**:Terracotta是一个高性能、易于使用的分布式内存解决方案,它允许开发者轻松地创建可扩展的Java应用程序。...

    ubuntu fastDFS 文件分布式存储框架配置

    以下是关于"ubuntu fastDFS 文件分布式存储框架配置"的知识点详解: 1. **系统环境准备**:首先确保你的Ubuntu系统是最新的,可以通过`sudo apt-get update && sudo apt-get upgrade`命令进行更新。另外,需要安装...

    CTG-BSS_分布式WEB框架_操作手册V0.4 共58页.pdf

    1.1. **背景**:随着互联网业务的快速发展,单一服务器已无法满足高并发、大流量的需求,分布式WEB框架应运而生,它能够将应用程序分解为多个独立的服务,实现水平扩展和故障隔离。 1.2. **目的**:手册的目标是为...

    java高并发程序设计(原版电子书)

    《Java高并发程序设计》是一本深入探讨Java平台上的并发编程技术的专业书籍,由葛一鸣等人编著。这本书旨在帮助读者理解并掌握在高并发环境下编写高效、稳定且可扩展的Java应用程序的关键技巧和最佳实践。以下是该书...

    Linkis分布式服务框架 v1.3.2.zip

    《Linkis分布式服务框架v1.3.2详解》 Linkis是一款强大的分布式服务框架,专为大数据开发、治理和分析而设计。在版本v1.3.2中,它进一步提升了性能和稳定性,提供了更加丰富的功能,适用于各种复杂的业务场景。本文...

    Dubbo阿里巴巴分布式服务框架

    《Dubbo:阿里巴巴分布式服务框架详解》 Dubbo,源自阿里巴巴,是一款高性能、轻量级的开源Java RPC框架,它提供了三大核心能力:面向接口的远程方法调用、智能容错和负载均衡、以及服务自动注册与发现。作为企业级...

Global site tag (gtag.js) - Google Analytics