`
QING____
  • 浏览: 2262494 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

zookeeper实战:SingleWorker代码样例

 
阅读更多

     我们需要一个“单点worker”系统,此系统来确保系统中定时任务在分布式环境中,任意时刻只有一个实例处于活跃;比如,生产环境中,有6台机器支撑一个应用,但是一个应用中有30个定时任务,这些任务有些必须被在“单线程”环境中运行(例如“数据统计”任务),避免并发的原因不是在java层面,可能是在操作db数据时,或者是在消息消费时,或者是信息推送时等。某个指标的“数据统计”任务,每天只需要执行一次,即使执行多次也是妄费,因为这种类型的定时任务,需要被“单点”。同时,如果一个任务在没有报告结果的情况下异常推出,我们仍然期望集群中其他实例能够主动“接管”它。在实现不良好的架构中,可能有些开发者使用手动触发特定脚本的方式执行,有些web项目可能是通过配置特定host的方式开启任务。对于某些定时任务,可能会采用quartz-cluster中的某些实现,但是他需要数据库的额外支持。
     此时,我们将使用zookeeper来实现此功能。本实例提供了如下功能展示:
    1) 提供了单点worker功能
    2) 提供了worker均衡能力(30个worker相对均匀的分配到6台机器上)
    3) 提供了worker失效接管能力。

     但是仍有很多亟待解决的问题:
    1) 无法确保任务的接管是及时的,即一个任务执行者失效,将会在一定的过期时间后,才会被其他sid接管
    2) 在极少的情况下,仍然会有一个任务同时被2个sid执行。
    3) 在极少的情况下,会有极短的时间内,一个任务不会被任何sid接管,处于“孤立”状态
    尽管zk提供了watch机制,但是上述问题,不仅不能完全避免,还会额外增加代码的复杂度。最终我个人放弃了对在此类中使用watch的想法。。
    注意:zk中exist和create/delete等操作并非原子,可能在exist返回false的情况下,去create此节点,也有可能抛出NoExistsException;你应该能够想到“并发”环境造成此问题的时机(其他zk客户端也有类似的操作,并发)。
    注意:在zk中删除父节点,将会导致子节点一并删除;同理,如果创建一个节点,那么它的各级父节点必须已经存在,且节点的层级越深,对zk底层存储而言数据结构越冗杂。
 
    数据结构与设计思路:
    1) serverType为当前应用标识,我们期望每个应用都有各自的serverType,方便数据分类; jobType为任务类型或者任务名称;如下全节点表示某个serverType的jobType下有sid1,sid2,sid3共三个实例(例如tomcat实例,或者物理机器标识)参与了此任务。zk节点路径格式:
        /severType/jobType/register/sid1
        ............................../sid2
        ............................../sid3
    2) 表示此jobType,被sid1运行。zk节点路径格式
        /severType/jobType/alive/sid1 挂载数据:null
    3) /serverType/jobType 挂载数据:cronExpression;将任务的“cron表达式”作为数据挂载
    4) {todu} 表示serverType下每个sid运行的任务个数,我们可以用来“均衡”任务,将新任务分配给任务较少的sid上。
        /serverType/sid1 挂载数据:任务个数.

 

 

如下是本人的代码样例,实际生产环境中代码与样例有区别,此处仅供参考,本实例基于zookeeper + quartz 2.1,如有错误之处,请不吝赐教:

1) TestMain.java :测试引导类

2) PrintNumberJob.java:一个简单的任务,打印一个随即数字。

3) PrintTimeJob.java:一个简单的任务,打印当前时间。

4) SingleWorkerManager.java:核心类,用于处理调用者提交的任务,并确保结果符合预期。此类有2个内部工作线程组成,分别处理zk数据同步和用户任务交付等工作。

 

很遗憾,源代码非常的长,尽管我已经足够细心的整理格式,但还是不够悦目,建议参阅者下载代码阅读,谢谢

SingleWorkerManager.java

 

package com.sample.zk.singleWorker;

/** 
 * 
 * @author qing
 *
 */
public class SingleWorkerManager {

	private static final String GROUP = "single-worker";
	private Scheduler scheduler;
	private ZooKeeper zkClient;
	private String serverType = "_-default-_";//默认serverType类型,我祈祷不会有人估计它和一样
	private static final String REGISTER = "/register";
	private static final String ALIVE = "/alive";
	
	private Watcher dw = new InnerZK();//default watcher;
	
	private boolean isAlive = false;//是否可用
	
	private Object tag = new Object();
	
	private ReentrantLock lock = new ReentrantLock();
	
	private String sid;//当前server标记,可以是IP等,主要用来表达如下描述:某sid上运行**任务;将**任务分配给某sid;
	//真实场景下,可以为IP为192.168.197.2下tomcat运行printNumber任务;
	//任务在哪个server上运行,需要有明确的信息才行,所以sid的设计需要很直观。
	//
	//很多时候,我们都是以ip地址来标记任务被运行的环境地址,不过在有些比较“穷苦的公司”,
	//可能一个物理server下运行多个对等的tomcat实例
	//或许这种方式下,使用ip作为标记,就一些麻烦了。
	
	//已经在本地提交,但尚未提交给zk的任务,直到zk接受任务之后,提交任务者才返回
	//+++++++++++++++++++++++++++++++++++++++++++
	//如何设计任务提交,可能面临一个奇怪的选择,如下是2种队列:
	//LinkedBlockingQueue提供了阻塞与非阻塞两种方式,非阻塞的方式允许任务提交这立即返回,但是此任务此后是否能够被zk
	//正确接受将存在风险,有可能zk的故障,导致此任务无法正常运行。
	//private BlockingQueue<Worker> outgoingWorker  = new LinkedBlockingQueue<Worker>();
	//++++++++++++++++++++++++++++++++++++++++++++
	//同步队列,是一个“单工”队列,如果任务任务被zk正确接受之后,任务提交者才返回,这是一个理想的情况。任务一旦开始被处理,任务提交者就可以返回了。
	//如果任务提交时,刚好zk环境故障,那么此任务将会被重试多次,如果还未能成功,则失败。
	//++++++++++++++++++++++++
	//无论如何,你都需要做出一个选择,我选择了最直观的答案:SynchronousQueue + 同步
	//++++++++++++++++++++++++
	private SynchronousQueue<Worker> outgoingWorker = new SynchronousQueue<Worker>();
	
	//当前serverType下所有的任务
	private Map<String,Worker> allWorkers = new HashMap<String,Worker>();
	
	//当前实例上所运行的任务,它是allWorkers的子集
	private Map<String,Worker> selfWorkers = new HashMap<String,Worker>();
	
	//用来间歇性的与zk进行同步,用来检测job的冲突或者新job的分配
	private Thread syncThread;
	//用于向zk提交任务数据的线程,将和SynchronousQueue协同工作
	private Thread workerThread;
	
	/**
	 * 创建zk实例
	 */
	public SingleWorkerManager(String sid){
		this(sid,null);
		
	}
	
	public SingleWorkerManager(String sid,String sType){
		if(sType != null){
			this.serverType = sType;
		}
		try{
			zkClient = new ZooKeeper(Constants.connectString, 3000, dw,false);
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);
		}
		this.sid = sid;
		syncThread = new Thread(new SyncHandler());
		syncThread.setDaemon(true);
		syncThread.start();
	}
	
	/**
	 * 开启任务调度器
	 */
	public void start(){
		try{
			scheduler = StdSchedulerFactory.getDefaultScheduler();
			scheduler.start();
			workerThread = new Thread(new WorkerHandler());
			workerThread.setDaemon(true);
			workerThread.start();
			isAlive = true;
			synchronized (tag) {
				tag.notifyAll();
			}
			
			//首次同步
			sync();
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);//异常退出
		}
	}
	
	/**
	 * 关闭任务调度器,关闭zookeeper链接
	 * 此后将导致任务被立即取消,singleWorkerManager实例将无法被重用
	 */
	public void close(){
		lock.lock();
		try{
			isAlive = false;
			scheduler.shutdown();
			if (syncThread.isAlive()) {
				syncThread.interrupt();
			}
			if(workerThread.isAlive()){
				workerThread.interrupt();
			}
			if(zkClient != null){
				zkClient.close();
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	/**
	 * 取消job,将触发zk服务也“取消”此任务
	 * @param jobName
	 */
	public void unschedule(String jobName){
		try{
			//here,zk
			lock.unlock();
			try{
				String jobPath = "/" + serverType + "/" + jobName;
				Stat stat = zkClient.exists(jobPath, false);
				if(stat != null){
					zkClient.delete(jobPath, stat.getVersion());
				}
			}catch(NoNodeException e){
				//ignore;
			}catch(Exception e){
				e.printStackTrace();
			}
			//有syncHandler来取消本地任务
//			//here,local scheduler
//			//无论如何,本地都要取消
//			TriggerKey key = new TriggerKey(jobName, GROUP);
//			if(scheduler.checkExists(key)){
//				scheduler.unscheduleJob(key);
//			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	/**
	 * 提交任务,如果提交失败,将抛出异常
	 * @param jobClass
	 * @param cronExpression
	 * @return true任务提交成功,false任务提交失败
	 */
	public boolean schedule(Class<? extends Job> jobClass,String cronExpression){
		if(!isAlive){
			throw new IllegalStateException("worker has been closed!");
		}
		try{
			Worker worker = this.build(jobClass, cronExpression);
			return outgoingWorker.offer(worker,15,TimeUnit.SECONDS);//waiting here,最多15妙
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}
	
	private Worker build(Class<? extends Job> jobClass,String cronExpression){
		String name = jobClass.getName();//全路径name
		JobDetail job = JobBuilder.newJob(jobClass).withIdentity(name,GROUP).build();
		CronScheduleBuilder sb = CronScheduleBuilder.cronSchedule(cronExpression);//每两秒执行一次:"*/2 * * * * ?"
		Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name, GROUP).withSchedule(sb).build();
		return new Worker(job, trigger,cronExpression);
	}
	
	
	///////////////////////////////////////////////////////inner worker//////////////////////////////
	
	/**
	 * 当前实例的zkClient是否链接正常,scheduler是否处于可用状态
	 * @return
	 */
	private boolean isReady(){
		if(!isAlive){
			return false;
		}
		if(scheduler == null || zkClient == null){
			return false;
		}
		try{
			if(scheduler.isShutdown() || !scheduler.isStarted()){
				return false;
			}
		}catch(Exception e){
			e.printStackTrace();
			return false;
		}
		if(zkClient.getState().isConnected()){
			return true;
		}
		return false;
	}
	
	/**
	 * 同步selfWorkers列表,和zk环境中的列表进行比较,查看是否有任务冲突
	 */
	private void syncSelfWorker(){
		lock.lock();
		try{
			if(!isReady()){
				throw new RuntimeException("Scheduler error..");//以异常的方式中断
			}
			//首先检测自己持有的任务列表,是否和zk一致,首次同步,selfWorkers肯定是空,需要sync后续去做调度。
			for(String job : selfWorkers.keySet()){
				String jobPath = "/" + serverType + "/" + job;
				//如果此任务已经被远程取消,则取消本地job执行
				//所有的实例都会做同样的事情,一定会把那些“取消的任务”取消
				if(zkClient.exists(jobPath, false) == null){
					allWorkers.remove(job);
					Worker cw = selfWorkers.remove(job);
					if(cw != null){
						if(scheduler.checkExists(cw.getJob().getKey())){
							scheduler.unscheduleJob(cw.getTrigger().getKey());
						}
					}
					continue;
				}
				String alive = "/" + serverType + "/" + job + ALIVE;
				//查看是否有子节点冲突,比如一个job被多个server运行
				List<String> alives = zkClient.getChildren(alive, false);
				if(alives == null || alives.isEmpty()){
					//如果此任务尚未分配,则交付给workerHandler
					continue;
				}
				if(alives.size() == 1){
					String holder = alives.get(0);
					//如果已分配且接管者是自己,更新时间
					if(holder.equalsIgnoreCase(sid)){
						byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
						zkClient.setData(alive + "/" + sid, data, -1);//ignore version
						continue;//如果是自己
					}
				}
				//对于其他情况,当前sid只能让步(有可能会存在所有的sid都让步,导致任务在极短时间内无法运行,
				//后台“补救”线程会做工作)
				if(zkClient.exists(alive + "/" + sid, false) != null){
					try{
						zkClient.delete(alive + "/" + sid, -1);
						scheduler.unscheduleJob(new TriggerKey(job, GROUP));
						selfWorkers.remove(job);
					}catch(NoNodeException e){
						//ignore:
					}catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	

	/**
	 * 同步任务信息,将当前实例中scheduler运行的任务和zk进行比较,进行冲突检测。
	 * 1) 检测自己正在运行的任务,是否和zk中心中分配给自己的任务列表一致。
	 * 2) 获得当前serverType下所有的任务列表
	 * 
	 */
	private void sync(){
		lock.lock();
		try{
			if(!isReady()){
				throw new RuntimeException("Scheduler error..");
			}
			//检测一级节点
			Stat tstat = zkClient.exists("/" + serverType,false);
			if(tstat == null){
				try{
					zkClient.create("/" + serverType, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}catch(NodeExistsException e){
					//ignore
				}
			}
			//+++++++++++++++++++
			syncSelfWorker();
			//+++++++++++++++++++
			
			//获得所有任务列表
			List<String> allJobs = zkClient.getChildren("/" + serverType, false);
			if(allJobs == null){
				throw new RuntimeException("NO jobs, error..");//以异常的方式,终端方法调用,没有别的意思。
			}
			allWorkers.clear();//reload all
			for(String job : allJobs){
				try{
					//job为类的全名,节点下挂载的数据为cronException
					byte[] data = zkClient.getData("/" + serverType + "/" + job, false, null);
					if(data == null || data.length == 0){
						continue;
					}
					
					//简单考虑吧,不过作为一名合格的程序员,此处可能需要太多的校验。
					Class<? extends Job> jobClass = (Class<? extends Job>)ClassLoader.getSystemClassLoader().loadClass(job);
					Worker worker = build(jobClass, new String(data));
					allWorkers.put(job,worker);
					//自己检测到任务后,注册自己
					String registerPath = "/" + serverType + "/" + job + REGISTER + "/" + sid;
					//如果不存在
					if(zkClient.exists(registerPath, false) == null){
						try{
							zkClient.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						}catch(NodeExistsException ex){
							//ignore;如果自己已经注册过,则忽略
						}
					}
					//检测此worker是否为自己所持有
					String alivePath = "/" + serverType + "/" + job + ALIVE +"/" + sid;
					//如果此任务不属于自己运行,则继续
					if(zkClient.exists(alivePath, false) == null){
						continue;
					}
					//如果属于自己运行,则开启任务,本地是否开启任务,完全取决于zk的数据状态
					try{
						boolean exists = scheduler.checkExists(worker.getJob().getKey());
						if(!exists){
							//如果尚未在当前实例中调度,则立即调度
							scheduler.scheduleJob(worker.getJob(),worker.getTrigger());
							selfWorkers.put(job,worker);
						}
					}catch(Exception e){
						e.printStackTrace();
						zkClient.delete(alivePath, -1);//ignore version;
						//再次校验
						selfWorkers.remove(job);
					}
				}catch(ClassNotFoundException e){
					e.printStackTrace();
					throw new RuntimeException(e);
				}
			}
			
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	
	class InnerZK implements Watcher {

		public void process(WatchedEvent event) {
			// 如果是“数据变更”事件
			if (event.getType() != EventType.None) {
				//processExt(event);
				return;
			}
			
			// 如果是链接状态迁移
			// 参见keeperState
			switch (event.getState()) {
			case SyncConnected:
				System.out.println("Connected...");
				// 链接状态迁移时,检测worker信息
				sync();
				break;
			case Expired:
				System.out.println("Expired...");
				break;
			// session过期
			case Disconnected:
				// 链接断开,或session迁移
				System.out.println("Connecting....");
				break;
			case AuthFailed:
				close();
				throw new RuntimeException("ZK Connection auth failed...");
			default:
				break;
			}
		}
		
	}
	
	/**
	 * 分配任务,在所有的worker信息都同步结束后,然后在逐个检测任务状态,对于没有
	 * 被执行的新任务,或者已经失去托管的任务,交付给其他sid。
	 * 
	 * 任务分配,没有采取“严格均衡”的方式,我们使用了一个随即方式。
	 */
	private void scheduler(){
		lock.lock();
		for(String job : allWorkers.keySet()){
			try{
				//如果没有,则创建一个持久节点,挂载数据为,系统时间戳,你可以为此节点加上ACL控制,但会带来复杂度
				//这里可以创建为临时节点,那么你需要对此节点注册watch,当watch触发时(比如其他sid的session失效等)做job的接管
				//考虑到如果大量的job,大量的watch,在网络复杂的情况下,再加上对zk的并发操作,数据一致性是个问题。
				//此处,我们采取挂载“时间戳”的方式,在SyncHandler线程中,间歇性的去检测,惰性的非实时的分配和协调任务
				//此处就要求,你的应用服务器的时间,应该几乎非常一致,如果你无法做到,请在此处增加一个操作分支,从一个统一的地方获得时间:比如DB中等
				String alivePath = "/" + serverType + "/" + job + ALIVE;
				List<String> children = zkClient.getChildren(alivePath, false);//如果节点不存在,则在下一次sync时被补救
				if(children == null || children.isEmpty()){
					//此job尚未分配
					String registerPath = "/" + serverType + "/" + job + REGISTER;
					List<String> rc = zkClient.getChildren(registerPath, false);
					//等待下一次sync时准备节点数据
					if(rc == null || rc.isEmpty()){
						continue;
					}
					Collections.shuffle(rc);//打乱顺序,随即,取出第一个,其实你可以有很多更好的手段来实现“任务均衡”,此处仅为参考
					String tsid = rc.get(0);
					try{
						byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
						zkClient.create(alivePath + "/" + tsid, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						//tsid对应的syncHandler此后将会检测并补救。此处只是分配给他。
						//如果tsid也是失去托管的,那么下一次sync检测将会发现并移除,此处不再做多余的校验;
						//在极端情况下,比如你的“任务托管过期时间”过短,或者你的系统发布过程很长,但是所有的任务都失去托管
						//那么最终将会有一台机器接管大部分job,如果job个数很多,将会出现“雪崩效应”;
						//如果你不能容忍这些事情的发生,请在此处增加有效的barrier操作(如果接管任务个数达到一定个数,将接受但不执行任务)
						//或者refuse操作(既不接管也不执行任务)。
						System.out.println("Job switch,SID:" + tsid + ",JOB :" + job);
					}catch(NodeExistsException e){
						//ignore;
					}
					continue;
				}
				//如果job已经被其他sid接管,那么检测接管者,是否处于活跃,如果存在多个子节点,其实是
				//一种异常情况,此处我们只做校验,冲突有sync解决
				for(String id : children){
					String tpath = alivePath + "/" + id;
					Stat stat = new Stat();
					byte[] data = zkClient.getData(tpath, false,stat);
					long time = Long.valueOf(new String(data));
					long current = System.currentTimeMillis();
					//如果一个任务,它的执行者在2分钟内都没有和zk交互(synSelfWorker方法中会更新time)
					//表明已经过期
					//为了便于测试,此处为15秒
					if(time + 1500 < current){
						try{
							zkClient.delete(tpath, stat.getVersion());
						}catch(BadVersionException e){
							//ignore
						}catch(NoNodeException e){
							//ignore;
						}
					}else{
						System.out.println(id + " :" + job);
					}
				}
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		lock.unlock();
	}
	/**
	 * 任务同步线程,间歇性的检测zk持有的任务和本地任务是否一致
	 * 并负责分配任务
	 * @author qing
	 *
	 */
	class SyncHandler implements Runnable {

		public void run() {
			try {
				int i = 0;
				int l = 10;
				while (true) {
					synchronized (tag) {
						try{
							while(!scheduler.isStarted()){
								tag.wait();
							}
						}catch(Exception e){
							//
						}
					}
					System.out.println("Sync handler,running...tid: " + Thread.currentThread().getId());
					if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {
						lock.lock();
						try {
							// 回话重建等异常行为
							zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false);
							System.out.println("Reconnected success!...");
						} catch (Exception e) {
							e.printStackTrace();
							i++;
							Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次
						} finally {
							lock.unlock();
						}
						continue;
					}
					if (zkClient.getState().isConnected()) {
						sync();//同步任务
						scheduler();//任务分配和过期检测
						Thread.sleep(3000);// 如果被“中断”,直接退出
						i = 0;
					}else{
						Thread.sleep(3000);
					}
				}
			} catch (InterruptedException e) {
				System.out.println("SID:" + sid + ",SyncHandler Exit...");
				close();
			}

		}
	}
	
	/**
	 * 调用者提交的任务,将会被同步的方式交付给zk。此线程就是负责从queue中获取调用者
	 * 提交的job,然后依次在zk环境中生成节点数据。
	 * @author qing
	 *
	 */
	class WorkerHandler implements Runnable{
		private Set<Worker> pending = new HashSet<Worker>();
		private int count = 0;//max = 20;
		
		/**
		 * 将worker信息生成zk节点数据
		 * @param worker
		 * @return
		 */
		private boolean register(Worker worker){
			lock.lock();
			//逐级创建其父节点
			String jobName = worker.getJob().getKey().getName();
			try{
				Transaction tx = zkClient.transaction();//使用事务的方式
				String jobPath = "/" + serverType + "/" + jobName;
				if(zkClient.exists(jobPath, false) == null){
					tx.create(jobPath, worker.getCronExpression().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				String registerPath = "/" + serverType + "/" + jobName+ REGISTER;
				if(zkClient.exists(registerPath, false) == null){
					tx.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				String alivePath = "/" + serverType + "/" + jobName+ ALIVE;
				if(zkClient.exists(alivePath, false) == null){
					tx.create(alivePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				tx.create(registerPath + "/" + sid, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				tx.commit();
			}catch(NodeExistsException e){
				//ignore
			}catch(Exception e){
				e.printStackTrace();
				pending.add(worker);
				//对于异常数据,添加到一个补充操作队列,如果在操作中出现异常,那么将会在
				//补充操作中得到再次校验
			}
			lock.unlock();
			return true;
		}
		
		public void run(){
			try{
				while(true){
					synchronized (tag) {
						try{
							while(!scheduler.isStarted()){
								tag.wait();
							}
						}catch(Exception e){
							//
						}
					}
					System.out.println("Worker handler,running...");
					if(zkClient != null && zkClient.getState().isConnected()){
						System.out.println("Register...");
						Worker worker = outgoingWorker.take();
						register(worker);
						if(!pending.isEmpty()){
							Thread.sleep(500);
							Iterator<Worker> it = pending.iterator();
							while(it.hasNext()){
								boolean isOk = register(it.next());
								if(!isOk){
									count++;
									Thread.sleep(1000);
								}else{
									count = 0;
									it.remove();
								}
								//如果重试20次,仍无法成功,直接抛弃,非常遗憾
								if(count > 20){
									pending.clear();
								}
							}
						}
						
					}else{
						Thread.sleep(1000);
					}
				}
				
			}catch(InterruptedException e){
				System.out.println("SID:" + sid + ",WorkerHandler Exit...");
				close();
			}
		}
	}
	
	/**
	 * 全部删除当前serverType下所有的任务
	 */
	public void clear(){
		lock.lock();
		try{
			if(zkClient != null && zkClient.getState().isConnected()){
				zkClient.delete("/" + serverType, -1);
			}
//			if(scheduler != null && scheduler.isStarted()){
//				for(Worker worker : selfWorkers.values()){
//					scheduler.unscheduleJob(worker.getTrigger().getKey());
//				}
//			}
//			allWorkers.clear();
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}

}

 

其他辅助类,请参考附件中的源码,谢谢。

分享到:
评论
2 楼 QING____ 2015-06-01  
cpusoft 写道
是否可以考虑用zk的选举机制呢,先选举一个leader,然后这个leader执行实际的业务。
参见
http://www.infoq.com/cn/news/2014/06/qunaer-task-dispatching-center


原则上可以,其中apache 的curator提供了较好的解决方案,建议参考。
1 楼 cpusoft 2015-05-31  
是否可以考虑用zk的选举机制呢,先选举一个leader,然后这个leader执行实际的业务。
参见
http://www.infoq.com/cn/news/2014/06/qunaer-task-dispatching-center

相关推荐

    电子工程0欧姆电阻在PCB设计中的多功能应用

    内容概要:0欧姆电阻在电路设计中有多种重要作用。它不仅可以在PCB上为调试提供便利,还能用于跳线、替代不确定参数的元件以及测量电路的耗电流。此外,在布线困难时可作为应急解决方案。在高频信号环境下,它能充当电感或电容,有助于解决EMC问题。对于地线处理,0欧姆电阻可用于实现单点接地,避免模拟地和数字地直接大面积相连带来的互相干扰问题。在跨接电流回路方面,它可以提供较短的回流路径,减少干扰。同时,0欧姆电阻还适用于配置电路,防止用户误操作跳线或拨码开关,并且在布线、调试、测试、温度补偿等方面有着广泛应用,尤其在EMC对策中表现突出。; 适合人群:电子工程师、硬件设计师以及对电路设计感兴趣的爱好者。; 使用场景及目标:①在PCB设计阶段,利用0欧姆电阻进行灵活的电路调试与优化;②解决高频信号下的EMC问题,确保电路稳定性和抗干扰能力;③实现单点接地,避免不同地线间的相互干扰;④提高电路的可维护性和可靠性,降低生产成本。; 阅读建议:本文详细介绍了0欧姆电阻在电路设计中的多种应用场景,读者应结合具体项目需求来理解和运用这些知识,特别是在面对复杂的电路布局和电磁兼容性问题时,要充分考虑0欧姆电阻的独特优势。

    一个基于SpringBoot+Mybatis+Mysql+Html实现的页面登录案例

    mysql安装教程 一个基于SpringBoot+Mybatis+Mysql+Html实现的页面登录案例.

    全域旅游综合解决方案PPT(71页).pptx

    在探索智慧旅游的新纪元中,一个集科技、创新与服务于一体的整体解决方案正悄然改变着我们的旅行方式。智慧旅游,作为智慧城市的重要分支,旨在通过新一代信息技术,如云计算、大数据、物联网等,为游客、旅游企业及政府部门提供无缝对接、高效互动的旅游体验与管理模式。这一方案不仅重新定义了旅游行业的服务标准,更开启了旅游业数字化转型的新篇章。 智慧旅游的核心在于“以人为本”,它不仅仅关注技术的革新,更注重游客体验的提升。从游前的行程规划、信息查询,到游中的智能导航、个性化导览,再到游后的心情分享、服务评价,智慧旅游通过构建“一云多屏”的服务平台,让游客在旅游的全过程中都能享受到便捷、个性化的服务。例如,游客可以通过手机APP轻松定制专属行程,利用智能语音导览深入了解景点背后的故事,甚至通过三维GIS地图实现虚拟漫游,提前感受目的地的魅力。这些创新服务不仅增强了游客的参与感和满意度,也让旅游变得更加智能化、趣味化。 此外,智慧旅游还为旅游企业和政府部门带来了前所未有的管理变革。通过大数据分析,旅游企业能够精准把握市场动态,实现旅游产品的精准营销和个性化推荐,从而提升市场竞争力。而政府部门则能利用智慧旅游平台实现对旅游资源的科学规划和精细管理,提高监管效率和质量。例如,通过实时监控和数据分析,政府可以迅速应对旅游高峰期的客流压力,有效预防景区超载,保障游客安全。同时,智慧旅游还促进了跨行业、跨部门的数据共享与协同合作,为旅游业的可持续发展奠定了坚实基础。总之,智慧旅游以其独特的魅力和无限潜力,正引领着旅游业迈向一个更加智慧、便捷、高效的新时代。

    工业自动化中模拟量滤波防抖PLC程序的实现与应用

    内容概要:本文详细介绍了如何通过PLC程序实现模拟量滤波防抖,确保电流、电压和热电阻等信号的准确采集。核心算法采用掐头去尾平均法,即去掉一组数据中的最大值和最小值后取剩余数据的平均值,以消除因环境干扰导致的异常值。文中提供了详细的代码实现步骤,包括数据结构定义、主程序逻辑、间接寻址方法以及参数配置。此外,还讨论了如何通过死区判断和上升率限制进一步优化滤波效果,提高系统的稳定性和响应速度。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是熟悉PLC编程和模拟量信号处理的专业人士。 使用场景及目标:适用于需要高精度模拟量信号采集的工业控制系统,如电力、化工、制造业等领域。主要目标是提升数据采集的准确性和稳定性,减少外部干扰带来的误差。 其他说明:文中提供的代码示例基于西门子S7-1200/1500系列PLC,但相关原理和方法同样适用于其他品牌的PLC。建议在实际应用中根据具体情况调整参数设置,以达到最佳效果。

    【人工智能大模型发展】从技术突破到场景落地:大模型发展图谱与DeepSeek创新应用解析

    内容概要:本文详细介绍了大模型的发展现状与未来趋势,尤其聚焦于DeepSeek这一创新应用。文章首先回顾了人工智能的定义、分类及其发展历程,指出从摩尔定律到知识密度提升的转变,强调了大模型知识密度的重要性。随后,文章深入探讨了DeepSeek的发展路径及其核心价值,包括其推理模型、思维链技术的应用及局限性。此外,文章展示了DeepSeek在多个行业的应用场景,如智能客服、医疗、金融等,并分析了DeepSeek如何赋能个人发展,具体体现在公文写作、文档处理、知识搜索、论文写作等方面。最后,文章展望了大模型的发展趋势,如通用大模型与垂域大模型的协同发展,以及本地部署小模型成为主流应用渠道的趋势。 适合人群:对人工智能和大模型技术感兴趣的从业者、研究人员及希望利用DeepSeek提升工作效率的个人用户。 使用场景及目标:①了解大模型技术的最新进展和发展趋势;②掌握DeepSeek在不同领域的具体应用场景和操作方法;③学习如何通过DeepSeek提升个人在公文写作、文档处理、知识搜索、论文写作等方面的工作效率;④探索大模型在特定行业的应用潜力,如医疗、金融等领域。 其他说明:本文不仅提供了理论知识,还结合实际案例,详细介绍了DeepSeek在各个场景下的应用方式,帮助读者更好地理解和应用大模型技术。同时,文章也指出了当前大模型技术面临的挑战,如模型的局限性和数据安全问题,鼓励读者关注技术的持续改进和发展。

    电力负荷预测中LSSVM及其改进算法的性能对比研究

    内容概要:本文详细比较了四种基于最小二乘支持向量机(LSSVM)的短期电力负荷预测算法:原始LSSVM、SSA-LSSVM、VMD-LSSVM以及VMD-SSA-LSSVM。通过对这些算法的具体实现和性能评估,展示了每种方法的优势和局限性。实验结果显示,随着算法复杂度的增加,预测精度显著提高,特别是VMD-SSA-LSSVM在RMSE和MAPE等评价指标上表现出色,达到了接近真实值的预测效果。然而,这也伴随着计算成本的大幅上升。 适合人群:从事电力系统调度、数据分析、机器学习领域的研究人员和技术人员。 使用场景及目标:适用于需要进行短期电力负荷预测的研究项目或实际应用,旨在提高预测准确性,减少因天气变化、节假日等因素带来的不确定性影响。 其他说明:文中提供了详细的Python代码片段,帮助读者理解和复现相关算法。同时提醒,在选择模型时需综合考虑预测精度与计算效率之间的平衡。

    基于Python+Django的电影推荐系统:融合机器学习与深度学习的全栈实现

    内容概要:本文详细介绍了一种基于Python和Django框架构建的电影推荐系统。该系统不仅涵盖了用户端的基本功能(如登录、搜索、浏览、评论、评分、收藏),还包括管理端的增删改查操作。后端使用Python和Django框架,结合MySQL数据库,前端采用HTML、CSS和JavaScript实现交互界面。推荐算法方面,利用机器学习和深度学习技术,特别是协同过滤和内容过滤相结合的方式,确保推荐结果的多样性和精准性。此外,文中还讨论了一些常见的技术挑战及其解决方案,如用户冷启动问题、前端交互效果优化、数据库配置错误等。 适合人群:具有一定编程经验的Web开发者和技术爱好者,尤其是对Django框架、机器学习和深度学习感兴趣的读者。 使用场景及目标:适用于希望深入了解并实现一个完整的电影推荐系统的个人或团队。主要目标是掌握如何整合前后端技术,运用机器学习和深度学习算法提升用户体验。 其他说明:文中提供了大量代码片段和实践经验,帮助读者更好地理解和实施各个技术细节。同时强调了系统优化的重要性,如通过Redis缓存提高查询效率,使用AJAX实现无缝加载等。

    MATLAB实现V2G光储充一体化微网多目标优化调度策略及其应用

    内容概要:本文探讨了基于MATLAB平台的V2G(车辆到电网)光储充一体化微网多目标优化调度策略。该策略旨在通过建立光伏微网中以经济性和并网负荷波动率为双目标的蓄电池和V2G协同调度模型,利用粒子群优化(PSO)算法求解模型。文中详细介绍了模型搭建、核心算法实现、运行模式对比以及算例分析。结果显示,V2G模式能够显著提高系统的经济性和稳定性,减少蓄电池的需求量,优化三方(电网、微网调度中心、电动汽车用户)的利益。 适合人群:从事电力系统优化、智能电网研究的专业人士,尤其是对MATLAB编程有一定基础的研究人员和技术人员。 使用场景及目标:适用于需要优化光储充一体化微网调度策略的研究机构和企业。目标是在保证系统经济运行的同时,稳定并网负荷,减少波动,从而提升整体性能。 其他说明:代码注释详尽,包含并行计算框架、电池寿命模型和可视化模块等多个亮点。通过实际案例验证,证明了V2G模式的有效性。

    三菱FX3U五轴钻孔机PLC与威纶通触摸屏程序解析及优化技巧

    内容概要:本文详细介绍了三菱FX3U五轴钻孔机的PLC程序和威纶通触摸屏配置,涵盖梯形图编程、IO分配表、参数设置、自动补偿机制以及异常处理等方面。文章通过具体的代码实例展示了如何实现加工循环、参数动态调整、安全防护等功能,并分享了调试过程中遇到的问题及解决方案。此外,还提供了完整的工程文件,便于读者快速理解和应用。 适合人群:从事工业自动化领域的工程师和技术人员,尤其是对三菱PLC和威纶通触摸屏有一定了解的人群。 使用场景及目标:帮助读者掌握五轴钻孔机的控制系统设计方法,提高编程效率和设备稳定性,适用于类似机床控制系统的开发和维护。 其他说明:文中提到的许多技巧和注意事项来源于作者的实际工作经验,对于初学者来说非常有价值。同时,提供的完整工程文件可以作为参考模板,节省开发时间和成本。

    matlab开发相关资源.m

    matlab开发相关资源

    a383d-main.zip

    a383d-main.zip

    智慧小区解决方案.pptx

    智慧小区解决方案.pptx

    Seafile 基于 Qt 的 GUI 客户端

    Seafile 基于 Qt 的 GUI 客户端

    无人驾驶车辆局部路径规划:基于Matlab的Astar与RRT算法仿真及优化

    内容概要:本文详细介绍了无人驾驶车辆在局部路径规划中的两种经典算法——Astar和RRT的Matlab实现及其优化。首先,文章解释了Astar算法的核心思想,即通过启发函数进行路径搜索,并针对U型障碍等问题提出了双向搜索策略和动态权重调节。接着,文章探讨了RRT算法的特点,如随机生长特性和路径平滑处理,解决了路径过于曲折的问题。此外,还提出了一种混合算法HRA*,通过改进OPEN集的维护方式,提高了算法效率。最后,通过对不同场景的仿真测试,展示了两种算法在复杂环境中的性能差异,并提供了详细的调参经验和优化建议。 适合人群:对无人驾驶技术和路径规划感兴趣的科研人员、工程师以及有一定编程基础的学习者。 使用场景及目标:适用于研究无人驾驶车辆在复杂环境中的路径规划问题,帮助研究人员理解和优化Astar和RRT算法,提高路径规划的效率和准确性。 其他说明:文中附有大量Matlab代码片段和仿真结果图表,便于读者理解和复现实验。同时,提供了关于栅格地图分辨率、车辆动力学参数等方面的实用建议,有助于实际系统的部署和优化。

    选择.txt

    选择

    西门子200Smart与维纶触摸屏在疫苗车间控制系统的应用:配液、发酵、纯化及CIP清洗工艺详解

    内容概要:本文详细介绍了西门子200Smart PLC与维纶触摸屏在某疫苗车间控制系统的具体应用,涵盖配液、发酵、纯化及CIP清洗四个主要工艺环节。文中不仅展示了具体的编程代码和技术细节,还分享了许多实战经验和调试技巧。例如,在配液罐中,通过模拟量处理确保温度和液位的精确控制;发酵罐部分,着重讨论了PID参数整定和USS通讯控制变频器的方法;纯化过程中,强调了双PID串级控制的应用;CIP清洗环节,则涉及复杂的定时器逻辑和阀门联锁机制。此外,文章还提到了一些常见的陷阱及其解决方案,如通讯干扰、状态机切换等问题。 适合人群:具有一定PLC编程基础的技术人员,尤其是从事工业自动化领域的工程师。 使用场景及目标:适用于需要深入了解PLC与触摸屏集成控制系统的工程师,帮助他们在实际项目中更好地理解和应用相关技术和方法,提高系统的稳定性和可靠性。 其他说明:文章提供了大量实战经验和代码片段,有助于读者快速掌握关键技术点,并避免常见错误。同时,文中提到的一些优化措施和调试技巧对提升系统性能非常有帮助。

    Prosemirror 是一个基于 ContentEditable 的所见即所得 HTML 编辑器,功能强大,支持协作编辑和自定义文档模式Prosemirror 库由多个单独的模块

    Prosemirror 是一个基于 ContentEditable 的所见即所得 HTML 编辑器,功能强大,支持协作编辑和自定义文档模式Prosemirror 库由多个单独的模块

    直线感应电机瞬态磁场仿真教程:Maxwell 16.0与ANSYS 2020关键技术解析

    内容概要:本文详细介绍了使用Maxwell 16.0和ANSYS 2020进行直线感应电机瞬态磁场仿真的方法和技术要点。首先强调了建模前的准备工作,包括初级线圈布置、次级导体材料选择、气隙宽度等参数的确定。然后针对Maxwell 16.0用户,讲解了坐标系的选择(笛卡尔坐标系)、初级绕组绘制、运动参数设置、网格剖分优化以及边界条件的正确配置。对于ANSYS 2020用户,则着重讲述了如何利用Maxwell模块建立模型并在Mechanical中进行电磁力耦合分析,包括参数化扫描设置、气隙厚度扫描、磁密云图动态更新等技巧。此外,文中还分享了许多实用的经验和注意事项,如避免常见的参数设置错误、提高仿真精度的方法、处理推力波动等问题的具体措施。 适合人群:从事电机设计与仿真的工程师、研究人员,尤其是有一定Maxwell和ANSYS使用基础的技术人员。 使用场景及目标:帮助用户掌握直线感应电机瞬态磁场仿真的全流程,确保仿真结果的准确性,提升工作效率。具体应用场景包括但不限于新电机设计验证、现有电机性能优化、故障诊断等。 其他说明:文中提供了大量具体的命令和脚本示例,便于读者直接应用到实际工作中。同时,作者结合自身丰富的实践经验,给出了许多宝贵的建议和警示,有助于读者避开常见陷阱,顺利完成仿真任务。

    【Windows系统】Win10部署DeepSeek 7B模型:Ollama框架安装与模型运行体验

    内容概要:本文详细介绍了在Windows 10上部署DeepSeek 7B模型的步骤。首先,需安装Ollama框架,通过访问官网下载并运行安装包,安装路径默认为C盘且不可更改。安装完成后可通过命令提示符验证是否安装成功。接着,部署DeepSeek 7B模型,从指定网站下载模型后,使用命令`ollama run deepseek-r1:7b`启动模型,系统将自动下载模型文件(约4.7GB),建议开启科学上网以加快下载速度。部署完成后,可以通过ChatBox客户端选择Ollama API和DeepSeek 7B模型进行问答测试。最后,附录提供了DeepSeek 7B的部署要求及硬件配置建议。 适合人群:对AI模型部署有一定兴趣,尤其是希望在本地环境中运行大型语言模型的研究人员和开发者。 使用场景及目标:①为研究人员和开发者提供详细的步骤指导,确保他们能够在本地环境中成功部署DeepSeek 7B模型;②帮助用户理解部署过程中涉及的各项命令和工具的使用方法;③为后续基于DeepSeek 7B模型的应用开发打下基础。 阅读建议:由于部署过程涉及多个步骤和命令行操作,建议读者在实际操作前仔细阅读每一步骤,并根据自身硬件条件调整配置。此外,对于初次接触此类部署的用户,建议先熟悉相关命令行工具的使用,确保顺利完成部署。

Global site tag (gtag.js) - Google Analytics