`
qingshizhi
  • 浏览: 4556 次
社区版块
存档分类
最新评论

基于数据库锁实现springtask的集群

 
阅读更多
   目前项目使用大量spring-task,spring-task有个足的地方是缺失对集群的支持。quartz可以支持定时任务集群,我们项目没有用,所以就自己实现了。我们设计的定时任务有三类。1、节点间不允许并发,2、节点间允许并发,节点内不允许并发,3.节点间允许并发,节点内允许多线程并发。

首先实现任务接口
public interface Task {

	/**
	 * 定时任务被调用入口,此方法中异常应捕获,不应往外面抛出
	 */
    public void excut();
	
    /**
     * 定时业务任务实现方法
     * @throws Exception
     */
	public void doExcut() throws Exception;
	
	
	/**
	 * 是否允许多节点并发运行
	 * @return
	 */
	public boolean isConcurrent();
	
	
	/**
	 * 是否是单节中配置了多线程执行任务
	 * @return
	 */
	public boolean isMulitiThread();
	
	
	/**
	 * 设置任务数据分片信息,当多节点并发取数时,需通过此方法设置分片信息
	 * @param dataSliceInfo  每个节点的取数分片信息
	 */
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo);
	
	/**
	 * 获取单节点下,任务的并发数目 。当节点下任务为多线程并发时,返回此值
	 * @return
	 */
	public int getLocalMulitiThreadNum();
}



创建一个任务基类
public abstract class ClusterBaseTask implements Task {

	protected Logger logger = Logger.getLogger(getClass());
	
	
	@Autowired
	ClusterTaskExcutor excutor;
	
	/**
	 * 暴露在外的方法
	 * @throws Exception 
	 */
	@Override
	public void excut() {
		try {
			excutor.excute(this);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {

	}

	@Override
	public int getLocalMulitiThreadNum() {
		return 0;
	}

	
	

}



创建一个 ClusterTaskExcutor 进行任务控制
public interface ClusterTaskExcutor {

	
	void excute(Task task) throws Exception;
	
}


实现类
@Service
public class ClusterTaskExcutorImpl implements ClusterTaskExcutor{

	protected Logger logger = Logger.getLogger(getClass());
	
	@Autowired
    TaskLockerDAO taskLockerDAO;
	
	@Autowired
	TaskRuntimeInfoService taskRuntimeInfoService;
	
	//spring中具有线程池管理的调度
	@Autowired
	SchedulingTaskExecutor schedulingTaskExecutor;
	
	
	@Autowired
	HeartbeatService  heartbeatService;
	
	
	@Autowired
	private DataSourceTransactionManager txManager;
	
	

	@Override
	public void excute(final Task task) throws Exception {
		
		logger.info("ClusterTaskExcutorImpl begin ");
		
		if(task.isConcurrent()){//节点间可以并发执行
			
			if(task.isMulitiThread()){//一个节点下多线程执行任务
				doConcurrent(task);
			}else{//一个节点下单线程执行
				task.doExcut();
			}
			
		}else{//节点间互斥执行
	
			doSync(task);
		}
		
		logger.info("ClusterTaskExcutorImpl end ");
		
	}


	private void doConcurrent(final Task task) throws Exception {
		
	  
		DataSliceInfo dataSliceInfo=getDataSliceInfo();
		
		if(dataSliceInfo!=null){//获取到分片信息
			task.setDataSliceInfo(dataSliceInfo);
			
			int size=task.getLocalMulitiThreadNum();
			
			List<Future<?>> futures = new ArrayList<Future<?>>(size);
			
			for (int i = 0; i < size; i++) {
				
				RunableTask runableTask=new RunableTask(task);
				Future<?> f=schedulingTaskExecutor.submit(runableTask);
				futures.add(f);
			}
			
			//主线程等待,让所有线程执行完成后,主线程才执行
			for (Future<?> f : futures) {
				if (!f.isDone()) {
					try {
						f.get();
					} catch (CancellationException ignore) {
					} catch (ExecutionException ignore) {
					}
				}
			}
		}else{//未获取到分片信息
			logger.error("未获取到分片信息,任务退出执行");
		}
		
		
		
	}


	/**
	 * 获取本机的分片信息
	 * @return
	 * @throws UnknownHostException
	 * @throws Exception
	 */
	private DataSliceInfo getDataSliceInfo() {
		String nodeIp=null ;
		try {
			DataSliceInfo dataSliceInfo=new DataSliceInfo();
			List<String> ipList=heartbeatService.getAliveHostList();
			nodeIp= InetAddress.getLocalHost().getHostAddress();
			int index=MapUtil.getHashIndex(nodeIp, ipList);
			int size=ipList.size();
			dataSliceInfo.setIndex(index);
			dataSliceInfo.setSize(size);
			return dataSliceInfo;
		} catch (UnknownHostException e) {
			logger.error("getDataSliceInfo错误", e);
			return null;
		} catch (BusinessServiceException e) {
			logger.error("getDataSliceInfo错误;ip:"+nodeIp+"未激活");
			return null;
		}
	}


	/**
	 * 方法同步执行
	 * @param task
	 * @throws InterruptedException
	 */
	private void doSync(final Task task) throws InterruptedException {
		//手动提交事务
		DefaultTransactionDefinition def = new DefaultTransactionDefinition();
		def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 事物隔离级别,开启新事务
		TransactionStatus txStatus = txManager.getTransaction(def); // 获得事务状态
		
		try {
			String taskName=task.getClass().getName();
			if(taskLockerDAO.lockTask(taskName)){//获取任务锁
				
				final TaskRuntimeInfoDTO runtimeInfoDTO = getTaskRunTimeInfo(taskName);
				
				
				Thread thread=new Thread(new Runnable() {
					
					@Override
					public void run() {
						
						taskRuntimeInfoService.beginTask(runtimeInfoDTO);
						try {
							task.doExcut();
						} catch (Exception e) {
							logger.error(e.getMessage(), e);
						}finally{
							taskRuntimeInfoService.endTask(runtimeInfoDTO);
						}
					}
				});
				thread.start();
				thread.join();
			}else{
				logger.warn("任务:["+taskName+"],已经在执行,此线程退出执行");
			}
			
		} finally{
			//操作完成后手动提交事务
			txManager.commit(txStatus);
		}
	}


	private TaskRuntimeInfoDTO getTaskRunTimeInfo(String taskName)
			 {
		
		final TaskRuntimeInfoDTO   runtimeInfoDTO=new TaskRuntimeInfoDTO();
		runtimeInfoDTO.setcTaskName(taskName);
		String nodeIp;
		try {
			nodeIp = InetAddress.getLocalHost().getHostAddress();
		} catch (UnknownHostException e) {
			nodeIp="UnknownHost";
			logger.error("获取IP地址失败", e);
		}
		runtimeInfoDTO.setcRunNodeIp(nodeIp);
		runtimeInfoDTO.setcUpdCde(nodeIp);
		return runtimeInfoDTO;
	}

}


数据库表设计

create table TASK_LOCKER
(
  c_task_name VARCHAR2(256) not null
);
-- Add comments to the table 
comment on table TASK_LOCKER
  is '定时任务locker';
-- Add comments to the columns 
comment on column TASK_LOCKER.c_task_name
  is '任务名称';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_LOCKER
  add constraint TASK_LOCKER_PK_ID primary key (C_TASK_NAME);


-- Create table
create table TASK_RUNTIME_INFO
(
  c_task_name   VARCHAR2(256) not null,
  c_is_run      VARCHAR2(1) not null,
  c_run_node_ip VARCHAR2(256),
  c_crt_cde     VARCHAR2(128) not null,
  t_crt_date    DATE not null,
  c_upd_cde     VARCHAR2(128) not null,
  t_upd_date    DATE not null
);
-- Add comments to the table 
comment on table TASK_RUNTIME_INFO
  is '定时任务运行情况表';
-- Add comments to the columns 
comment on column TASK_RUNTIME_INFO.c_task_name
  is '任务名称';
comment on column TASK_RUNTIME_INFO.c_is_run
  is '是否正在运行';
comment on column TASK_RUNTIME_INFO.c_run_node_ip
  is '运行此任务的节点ip,此任务正在运行时有此值';
comment on column TASK_RUNTIME_INFO.c_crt_cde
  is '创建者';
comment on column TASK_RUNTIME_INFO.t_crt_date
  is '创建时间';
comment on column TASK_RUNTIME_INFO.c_upd_cde
  is '更新者';
comment on column TASK_RUNTIME_INFO.t_upd_date
  is '更新时间';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_RUNTIME_INFO
  add constraint TASK_RUNTIME_INFO_PK_ID primary key (C_TASK_NAME);


获取数据库sql
SELECT 1 FROM  task_locker t WHERE t.c_task_name=#taskName# FOR UPDATE NOWAIT


1、节点间不允许并发定时任务
@Service
public class ClusterTaskMock extends ClusterBaseTask implements TaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return false;
	}

	@Override
	public boolean isMulitiThread() {
		return false;
	}

	
}



2、节点间可以并发定时任务
@Service
public class ConcurrentClusterTaskMock extends ClusterBaseTask implements ConcurrentTaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		System.out.println(Thread.currentThread().isDaemon());
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return   true;
	}

	@Override
	public boolean isMulitiThread() {
		return true;
	}

	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {
		
	}

	@Override
	public int getLocalMulitiThreadNum() {
		
		return 5;
	}
	


DataSliceInfo是多线程执行任务时,我们采用对任务id取模的方法分配每个线程所需要执行的任务,避免了在不加锁情况下,多个线程取到相同的任务

public class DataSliceInfo {

	/**
	 * 总分片数
	 */
	private Integer size;
	
	/**
	 * 分片下标
	 */
	private Integer index;
	

	public Integer getSize() {
		return size;
	}

	public void setSize(Integer size) {
		this.size = size;
	}

	public Integer getIndex() {
		return index;
	}

	public void setIndex(Integer index) {
		this.index = index;
	}
	
	
	
	
}

分享到:
评论

相关推荐

    spring + quartz 集群配置

    java + quartz实现定时任务,实现集群配置,在集群环境下多节点运行定时Quartz定任务,就会存在重复处理任务的现象,为解决这一问题,下面我将介绍使用 Quartz 的 TASK ( 12 张表)实例化到数据库,基于数据库自动...

    Spring集群整合Quartz

    本文将深入探讨如何在Spring集群环境中整合Quartz,实现高可用、可扩展的定时任务解决方案。 **1. Spring与Quartz的集成** Spring通过Spring Job和Spring Task模块提供了轻量级的任务调度能力,但当面临复杂定时...

    SpringCloudSchedule定时任务

    本文将深入探讨如何使用Spring Cloud Schedule与MyBatis相结合,实现对MySQL数据库的读写操作,并设定按照间隔时间或固定时间执行的任务。 首先,我们要了解Spring Cloud Schedule的基本概念。它是基于Spring ...

    基于Java框架开发的WMS管理系统+完整源码+数据库备份

    9. **容器化和微服务**:可能采用了Docker进行应用容器化,Kubernetes或Docker Compose进行集群管理,以及Spring Cloud或Netflix OSS实现微服务架构。 为了运行这个WMS管理系统,你需要具备Java开发环境(JDK),一...

    spring-cloud模块spring-boot微服务 mysql数据同步到elasticsearch 实时同步

    Spring Cloud提供了一套完整的解决方案,用于构建分布式系统中的配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态等。而Spring Boot则简化了创建独立...

    分布式任务调度框架 集合

    基于zookeeper+spring task的分布式任务调度组件,非常小巧,无需任何修改就可以使spring task具备分布式特性,确保所有任务在集群中不重复,不遗漏的执行。 5.Quartz 官方地址:...

    java网上企业自动化办公系统

    7. **任务调度**:Quartz或Spring Task用于定时任务的调度,例如定期备份、数据统计等。 8. **日志管理**:Log4j、SLF4J或Logback用于记录系统日志,便于排查问题和性能分析。 9. **性能优化**:通过缓存技术如...

    基于Java的办公自动化管理系统的设计与实现.rar

    例如,使用JasperReports或iText库可以创建定制化的报表,而JavaMail API则可实现邮件服务的集成,Quartz或Spring Task则可进行定时任务的调度。 在数据库层面,MySQL或Oracle常常被选为后端存储,它们提供稳定的...

    低价机票监测系统小程序源码数据库.zip

    7. **数据同步**:低价机票的实时监测需要后台定时或实时地从航空公司的接口抓取数据,并更新到数据库中,这涉及到了后台的任务调度和异步处理技术,如Quartz或Spring Task。 8. **安全性**:项目应考虑安全方面,...

    springcloud面试题java2023

    Spring Cloud是一个基于Spring Boot实现的云应用开发工具集,它为开发者提供了在分布式系统(如配置管理、服务发现、断路器、智能路由、微代理、控制总线、一次性令牌、全局锁、领导选举、分布式会话、集群状态)中...

    Java项目实战-基于SSH的任务调度系统的设计与实现(附源码,部署说明).zip

    本项目是关于使用Java技术栈实现的一个基于SSH(Spring、Struts、Hibernate)的任务调度系统。SSH是一个流行的企业级Web应用程序开发框架,它整合了Spring的IoC(控制反转)和AOP(面向切面编程)特性,Struts的MVC...

    基于spring+quartz的分布式定时任务框架实现

    【基于spring+quartz的分布式定时任务框架实现】 在企业级应用中,定时任务是必不可少的,用于执行一些定期的任务,如数据同步、报表生成、邮件发送等。Spring框架提供了强大的定时任务支持,但要实现分布式环境下...

    Job Plus项目是基于SpringBoot+Vue的轻量级定时任务管理系统.zip

    Quartz是开源的作业调度框架,可以创建、调度和执行重复的作业,而Spring Task是Spring框架的一部分,提供了在Spring应用中定义和执行任务的能力。这些任务可以是数据库操作、文件处理、Web服务调用等。 4. **...

    基于SpringBoot电商网站.zip

    - **任务调度**:通过Spring Task或Quartz,实现后台任务自动化执行,如商品推荐、订单处理等。 - **消息队列**:整合RabbitMQ或Kafka,实现异步通信,提高系统响应速度。 3. **SpringStore-master项目结构分析**...

    基于springboot的外卖小程序管理系统源码.zip

    7. **任务调度**:可能包含订单状态自动更新、定时任务等,了解Quartz或Spring Task等调度库的使用。 8. **异常处理**:查看全局异常处理,学习如何优雅地处理和返回错误信息。 9. **测试**:了解单元测试、集成测试...

    task是一个任务调度统一管理平台 目前主要是通过http来进行任务的调度,http支持签名算法

    因此,Task能够快速集成各种服务和组件,如数据库连接、安全控制、监控等。同时,SpringBoot的微服务理念也让Task具备良好的可扩展性和维护性。 在集群部署方面,Task支持多节点部署,这意味着任务可以在多个服务器...

    Java项目之springboot在线外卖系统(源码)

    8. **调度任务**: 可能使用Quartz或Spring Task等工具实现后台任务,如定时发送订单状态更新通知、处理未付款订单等。 9. **前端技术**: 使用React、Vue.js或Angular等现代前端框架构建用户界面,配合Webpack进行...

    springboot394疫情居家办公系统--论文pf.zip

    5. 引入Quartz或Spring Task实现定时任务,如自动打卡提醒、任务到期通知等。 五、性能优化与测试 1. 使用Spring Boot Actuator进行系统监控,包括内存、线程、健康检查等指标。 2. 使用Hystrix进行服务降级、熔断...

    SpringBoot制卡审核管理系统.zip

    SpringBoot可以集成Quartz或Spring Task进行任务调度,定时发送通知。 6. 日志记录:系统需记录操作日志,便于追踪和问题排查。SpringBoot内置了Logback或Log4j2日志框架,方便进行日志管理。 7. RESTful API:...

    餐厅点餐系统,springboot+Mybatis.zip

    8. **任务调度**:系统可能包含订单状态更新、库存检查等定时任务,这可以借助Spring的Task Scheduler或Quartz等任务调度库实现。 9. **测试**:Spring Boot支持单元测试和集成测试,可以使用JUnit、Mockito等工具...

Global site tag (gtag.js) - Google Analytics