`
QING____
  • 浏览: 2255108 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

zookeeper实战:SingleWorker代码样例

 
阅读更多

     我们需要一个“单点worker”系统,此系统来确保系统中定时任务在分布式环境中,任意时刻只有一个实例处于活跃;比如,生产环境中,有6台机器支撑一个应用,但是一个应用中有30个定时任务,这些任务有些必须被在“单线程”环境中运行(例如“数据统计”任务),避免并发的原因不是在java层面,可能是在操作db数据时,或者是在消息消费时,或者是信息推送时等。某个指标的“数据统计”任务,每天只需要执行一次,即使执行多次也是妄费,因为这种类型的定时任务,需要被“单点”。同时,如果一个任务在没有报告结果的情况下异常推出,我们仍然期望集群中其他实例能够主动“接管”它。在实现不良好的架构中,可能有些开发者使用手动触发特定脚本的方式执行,有些web项目可能是通过配置特定host的方式开启任务。对于某些定时任务,可能会采用quartz-cluster中的某些实现,但是他需要数据库的额外支持。
     此时,我们将使用zookeeper来实现此功能。本实例提供了如下功能展示:
    1) 提供了单点worker功能
    2) 提供了worker均衡能力(30个worker相对均匀的分配到6台机器上)
    3) 提供了worker失效接管能力。

     但是仍有很多亟待解决的问题:
    1) 无法确保任务的接管是及时的,即一个任务执行者失效,将会在一定的过期时间后,才会被其他sid接管
    2) 在极少的情况下,仍然会有一个任务同时被2个sid执行。
    3) 在极少的情况下,会有极短的时间内,一个任务不会被任何sid接管,处于“孤立”状态
    尽管zk提供了watch机制,但是上述问题,不仅不能完全避免,还会额外增加代码的复杂度。最终我个人放弃了对在此类中使用watch的想法。。
    注意:zk中exist和create/delete等操作并非原子,可能在exist返回false的情况下,去create此节点,也有可能抛出NoExistsException;你应该能够想到“并发”环境造成此问题的时机(其他zk客户端也有类似的操作,并发)。
    注意:在zk中删除父节点,将会导致子节点一并删除;同理,如果创建一个节点,那么它的各级父节点必须已经存在,且节点的层级越深,对zk底层存储而言数据结构越冗杂。
 
    数据结构与设计思路:
    1) serverType为当前应用标识,我们期望每个应用都有各自的serverType,方便数据分类; jobType为任务类型或者任务名称;如下全节点表示某个serverType的jobType下有sid1,sid2,sid3共三个实例(例如tomcat实例,或者物理机器标识)参与了此任务。zk节点路径格式:
        /severType/jobType/register/sid1
        ............................../sid2
        ............................../sid3
    2) 表示此jobType,被sid1运行。zk节点路径格式
        /severType/jobType/alive/sid1 挂载数据:null
    3) /serverType/jobType 挂载数据:cronExpression;将任务的“cron表达式”作为数据挂载
    4) {todu} 表示serverType下每个sid运行的任务个数,我们可以用来“均衡”任务,将新任务分配给任务较少的sid上。
        /serverType/sid1 挂载数据:任务个数.

 

 

如下是本人的代码样例,实际生产环境中代码与样例有区别,此处仅供参考,本实例基于zookeeper + quartz 2.1,如有错误之处,请不吝赐教:

1) TestMain.java :测试引导类

2) PrintNumberJob.java:一个简单的任务,打印一个随即数字。

3) PrintTimeJob.java:一个简单的任务,打印当前时间。

4) SingleWorkerManager.java:核心类,用于处理调用者提交的任务,并确保结果符合预期。此类有2个内部工作线程组成,分别处理zk数据同步和用户任务交付等工作。

 

很遗憾,源代码非常的长,尽管我已经足够细心的整理格式,但还是不够悦目,建议参阅者下载代码阅读,谢谢

SingleWorkerManager.java

 

package com.sample.zk.singleWorker;

/** 
 * 
 * @author qing
 *
 */
public class SingleWorkerManager {

	private static final String GROUP = "single-worker";
	private Scheduler scheduler;
	private ZooKeeper zkClient;
	private String serverType = "_-default-_";//默认serverType类型,我祈祷不会有人估计它和一样
	private static final String REGISTER = "/register";
	private static final String ALIVE = "/alive";
	
	private Watcher dw = new InnerZK();//default watcher;
	
	private boolean isAlive = false;//是否可用
	
	private Object tag = new Object();
	
	private ReentrantLock lock = new ReentrantLock();
	
	private String sid;//当前server标记,可以是IP等,主要用来表达如下描述:某sid上运行**任务;将**任务分配给某sid;
	//真实场景下,可以为IP为192.168.197.2下tomcat运行printNumber任务;
	//任务在哪个server上运行,需要有明确的信息才行,所以sid的设计需要很直观。
	//
	//很多时候,我们都是以ip地址来标记任务被运行的环境地址,不过在有些比较“穷苦的公司”,
	//可能一个物理server下运行多个对等的tomcat实例
	//或许这种方式下,使用ip作为标记,就一些麻烦了。
	
	//已经在本地提交,但尚未提交给zk的任务,直到zk接受任务之后,提交任务者才返回
	//+++++++++++++++++++++++++++++++++++++++++++
	//如何设计任务提交,可能面临一个奇怪的选择,如下是2种队列:
	//LinkedBlockingQueue提供了阻塞与非阻塞两种方式,非阻塞的方式允许任务提交这立即返回,但是此任务此后是否能够被zk
	//正确接受将存在风险,有可能zk的故障,导致此任务无法正常运行。
	//private BlockingQueue<Worker> outgoingWorker  = new LinkedBlockingQueue<Worker>();
	//++++++++++++++++++++++++++++++++++++++++++++
	//同步队列,是一个“单工”队列,如果任务任务被zk正确接受之后,任务提交者才返回,这是一个理想的情况。任务一旦开始被处理,任务提交者就可以返回了。
	//如果任务提交时,刚好zk环境故障,那么此任务将会被重试多次,如果还未能成功,则失败。
	//++++++++++++++++++++++++
	//无论如何,你都需要做出一个选择,我选择了最直观的答案:SynchronousQueue + 同步
	//++++++++++++++++++++++++
	private SynchronousQueue<Worker> outgoingWorker = new SynchronousQueue<Worker>();
	
	//当前serverType下所有的任务
	private Map<String,Worker> allWorkers = new HashMap<String,Worker>();
	
	//当前实例上所运行的任务,它是allWorkers的子集
	private Map<String,Worker> selfWorkers = new HashMap<String,Worker>();
	
	//用来间歇性的与zk进行同步,用来检测job的冲突或者新job的分配
	private Thread syncThread;
	//用于向zk提交任务数据的线程,将和SynchronousQueue协同工作
	private Thread workerThread;
	
	/**
	 * 创建zk实例
	 */
	public SingleWorkerManager(String sid){
		this(sid,null);
		
	}
	
	public SingleWorkerManager(String sid,String sType){
		if(sType != null){
			this.serverType = sType;
		}
		try{
			zkClient = new ZooKeeper(Constants.connectString, 3000, dw,false);
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);
		}
		this.sid = sid;
		syncThread = new Thread(new SyncHandler());
		syncThread.setDaemon(true);
		syncThread.start();
	}
	
	/**
	 * 开启任务调度器
	 */
	public void start(){
		try{
			scheduler = StdSchedulerFactory.getDefaultScheduler();
			scheduler.start();
			workerThread = new Thread(new WorkerHandler());
			workerThread.setDaemon(true);
			workerThread.start();
			isAlive = true;
			synchronized (tag) {
				tag.notifyAll();
			}
			
			//首次同步
			sync();
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);//异常退出
		}
	}
	
	/**
	 * 关闭任务调度器,关闭zookeeper链接
	 * 此后将导致任务被立即取消,singleWorkerManager实例将无法被重用
	 */
	public void close(){
		lock.lock();
		try{
			isAlive = false;
			scheduler.shutdown();
			if (syncThread.isAlive()) {
				syncThread.interrupt();
			}
			if(workerThread.isAlive()){
				workerThread.interrupt();
			}
			if(zkClient != null){
				zkClient.close();
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	/**
	 * 取消job,将触发zk服务也“取消”此任务
	 * @param jobName
	 */
	public void unschedule(String jobName){
		try{
			//here,zk
			lock.unlock();
			try{
				String jobPath = "/" + serverType + "/" + jobName;
				Stat stat = zkClient.exists(jobPath, false);
				if(stat != null){
					zkClient.delete(jobPath, stat.getVersion());
				}
			}catch(NoNodeException e){
				//ignore;
			}catch(Exception e){
				e.printStackTrace();
			}
			//有syncHandler来取消本地任务
//			//here,local scheduler
//			//无论如何,本地都要取消
//			TriggerKey key = new TriggerKey(jobName, GROUP);
//			if(scheduler.checkExists(key)){
//				scheduler.unscheduleJob(key);
//			}
		}catch(Exception e){
			e.printStackTrace();
		}
	}
	
	/**
	 * 提交任务,如果提交失败,将抛出异常
	 * @param jobClass
	 * @param cronExpression
	 * @return true任务提交成功,false任务提交失败
	 */
	public boolean schedule(Class<? extends Job> jobClass,String cronExpression){
		if(!isAlive){
			throw new IllegalStateException("worker has been closed!");
		}
		try{
			Worker worker = this.build(jobClass, cronExpression);
			return outgoingWorker.offer(worker,15,TimeUnit.SECONDS);//waiting here,最多15妙
		}catch(Exception e){
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}
	
	private Worker build(Class<? extends Job> jobClass,String cronExpression){
		String name = jobClass.getName();//全路径name
		JobDetail job = JobBuilder.newJob(jobClass).withIdentity(name,GROUP).build();
		CronScheduleBuilder sb = CronScheduleBuilder.cronSchedule(cronExpression);//每两秒执行一次:"*/2 * * * * ?"
		Trigger trigger = TriggerBuilder.newTrigger().withIdentity(name, GROUP).withSchedule(sb).build();
		return new Worker(job, trigger,cronExpression);
	}
	
	
	///////////////////////////////////////////////////////inner worker//////////////////////////////
	
	/**
	 * 当前实例的zkClient是否链接正常,scheduler是否处于可用状态
	 * @return
	 */
	private boolean isReady(){
		if(!isAlive){
			return false;
		}
		if(scheduler == null || zkClient == null){
			return false;
		}
		try{
			if(scheduler.isShutdown() || !scheduler.isStarted()){
				return false;
			}
		}catch(Exception e){
			e.printStackTrace();
			return false;
		}
		if(zkClient.getState().isConnected()){
			return true;
		}
		return false;
	}
	
	/**
	 * 同步selfWorkers列表,和zk环境中的列表进行比较,查看是否有任务冲突
	 */
	private void syncSelfWorker(){
		lock.lock();
		try{
			if(!isReady()){
				throw new RuntimeException("Scheduler error..");//以异常的方式中断
			}
			//首先检测自己持有的任务列表,是否和zk一致,首次同步,selfWorkers肯定是空,需要sync后续去做调度。
			for(String job : selfWorkers.keySet()){
				String jobPath = "/" + serverType + "/" + job;
				//如果此任务已经被远程取消,则取消本地job执行
				//所有的实例都会做同样的事情,一定会把那些“取消的任务”取消
				if(zkClient.exists(jobPath, false) == null){
					allWorkers.remove(job);
					Worker cw = selfWorkers.remove(job);
					if(cw != null){
						if(scheduler.checkExists(cw.getJob().getKey())){
							scheduler.unscheduleJob(cw.getTrigger().getKey());
						}
					}
					continue;
				}
				String alive = "/" + serverType + "/" + job + ALIVE;
				//查看是否有子节点冲突,比如一个job被多个server运行
				List<String> alives = zkClient.getChildren(alive, false);
				if(alives == null || alives.isEmpty()){
					//如果此任务尚未分配,则交付给workerHandler
					continue;
				}
				if(alives.size() == 1){
					String holder = alives.get(0);
					//如果已分配且接管者是自己,更新时间
					if(holder.equalsIgnoreCase(sid)){
						byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
						zkClient.setData(alive + "/" + sid, data, -1);//ignore version
						continue;//如果是自己
					}
				}
				//对于其他情况,当前sid只能让步(有可能会存在所有的sid都让步,导致任务在极短时间内无法运行,
				//后台“补救”线程会做工作)
				if(zkClient.exists(alive + "/" + sid, false) != null){
					try{
						zkClient.delete(alive + "/" + sid, -1);
						scheduler.unscheduleJob(new TriggerKey(job, GROUP));
						selfWorkers.remove(job);
					}catch(NoNodeException e){
						//ignore:
					}catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	

	/**
	 * 同步任务信息,将当前实例中scheduler运行的任务和zk进行比较,进行冲突检测。
	 * 1) 检测自己正在运行的任务,是否和zk中心中分配给自己的任务列表一致。
	 * 2) 获得当前serverType下所有的任务列表
	 * 
	 */
	private void sync(){
		lock.lock();
		try{
			if(!isReady()){
				throw new RuntimeException("Scheduler error..");
			}
			//检测一级节点
			Stat tstat = zkClient.exists("/" + serverType,false);
			if(tstat == null){
				try{
					zkClient.create("/" + serverType, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}catch(NodeExistsException e){
					//ignore
				}
			}
			//+++++++++++++++++++
			syncSelfWorker();
			//+++++++++++++++++++
			
			//获得所有任务列表
			List<String> allJobs = zkClient.getChildren("/" + serverType, false);
			if(allJobs == null){
				throw new RuntimeException("NO jobs, error..");//以异常的方式,终端方法调用,没有别的意思。
			}
			allWorkers.clear();//reload all
			for(String job : allJobs){
				try{
					//job为类的全名,节点下挂载的数据为cronException
					byte[] data = zkClient.getData("/" + serverType + "/" + job, false, null);
					if(data == null || data.length == 0){
						continue;
					}
					
					//简单考虑吧,不过作为一名合格的程序员,此处可能需要太多的校验。
					Class<? extends Job> jobClass = (Class<? extends Job>)ClassLoader.getSystemClassLoader().loadClass(job);
					Worker worker = build(jobClass, new String(data));
					allWorkers.put(job,worker);
					//自己检测到任务后,注册自己
					String registerPath = "/" + serverType + "/" + job + REGISTER + "/" + sid;
					//如果不存在
					if(zkClient.exists(registerPath, false) == null){
						try{
							zkClient.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						}catch(NodeExistsException ex){
							//ignore;如果自己已经注册过,则忽略
						}
					}
					//检测此worker是否为自己所持有
					String alivePath = "/" + serverType + "/" + job + ALIVE +"/" + sid;
					//如果此任务不属于自己运行,则继续
					if(zkClient.exists(alivePath, false) == null){
						continue;
					}
					//如果属于自己运行,则开启任务,本地是否开启任务,完全取决于zk的数据状态
					try{
						boolean exists = scheduler.checkExists(worker.getJob().getKey());
						if(!exists){
							//如果尚未在当前实例中调度,则立即调度
							scheduler.scheduleJob(worker.getJob(),worker.getTrigger());
							selfWorkers.put(job,worker);
						}
					}catch(Exception e){
						e.printStackTrace();
						zkClient.delete(alivePath, -1);//ignore version;
						//再次校验
						selfWorkers.remove(job);
					}
				}catch(ClassNotFoundException e){
					e.printStackTrace();
					throw new RuntimeException(e);
				}
			}
			
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}
	
	
	class InnerZK implements Watcher {

		public void process(WatchedEvent event) {
			// 如果是“数据变更”事件
			if (event.getType() != EventType.None) {
				//processExt(event);
				return;
			}
			
			// 如果是链接状态迁移
			// 参见keeperState
			switch (event.getState()) {
			case SyncConnected:
				System.out.println("Connected...");
				// 链接状态迁移时,检测worker信息
				sync();
				break;
			case Expired:
				System.out.println("Expired...");
				break;
			// session过期
			case Disconnected:
				// 链接断开,或session迁移
				System.out.println("Connecting....");
				break;
			case AuthFailed:
				close();
				throw new RuntimeException("ZK Connection auth failed...");
			default:
				break;
			}
		}
		
	}
	
	/**
	 * 分配任务,在所有的worker信息都同步结束后,然后在逐个检测任务状态,对于没有
	 * 被执行的新任务,或者已经失去托管的任务,交付给其他sid。
	 * 
	 * 任务分配,没有采取“严格均衡”的方式,我们使用了一个随即方式。
	 */
	private void scheduler(){
		lock.lock();
		for(String job : allWorkers.keySet()){
			try{
				//如果没有,则创建一个持久节点,挂载数据为,系统时间戳,你可以为此节点加上ACL控制,但会带来复杂度
				//这里可以创建为临时节点,那么你需要对此节点注册watch,当watch触发时(比如其他sid的session失效等)做job的接管
				//考虑到如果大量的job,大量的watch,在网络复杂的情况下,再加上对zk的并发操作,数据一致性是个问题。
				//此处,我们采取挂载“时间戳”的方式,在SyncHandler线程中,间歇性的去检测,惰性的非实时的分配和协调任务
				//此处就要求,你的应用服务器的时间,应该几乎非常一致,如果你无法做到,请在此处增加一个操作分支,从一个统一的地方获得时间:比如DB中等
				String alivePath = "/" + serverType + "/" + job + ALIVE;
				List<String> children = zkClient.getChildren(alivePath, false);//如果节点不存在,则在下一次sync时被补救
				if(children == null || children.isEmpty()){
					//此job尚未分配
					String registerPath = "/" + serverType + "/" + job + REGISTER;
					List<String> rc = zkClient.getChildren(registerPath, false);
					//等待下一次sync时准备节点数据
					if(rc == null || rc.isEmpty()){
						continue;
					}
					Collections.shuffle(rc);//打乱顺序,随即,取出第一个,其实你可以有很多更好的手段来实现“任务均衡”,此处仅为参考
					String tsid = rc.get(0);
					try{
						byte[] data = String.valueOf(System.currentTimeMillis()).getBytes();
						zkClient.create(alivePath + "/" + tsid, data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
						//tsid对应的syncHandler此后将会检测并补救。此处只是分配给他。
						//如果tsid也是失去托管的,那么下一次sync检测将会发现并移除,此处不再做多余的校验;
						//在极端情况下,比如你的“任务托管过期时间”过短,或者你的系统发布过程很长,但是所有的任务都失去托管
						//那么最终将会有一台机器接管大部分job,如果job个数很多,将会出现“雪崩效应”;
						//如果你不能容忍这些事情的发生,请在此处增加有效的barrier操作(如果接管任务个数达到一定个数,将接受但不执行任务)
						//或者refuse操作(既不接管也不执行任务)。
						System.out.println("Job switch,SID:" + tsid + ",JOB :" + job);
					}catch(NodeExistsException e){
						//ignore;
					}
					continue;
				}
				//如果job已经被其他sid接管,那么检测接管者,是否处于活跃,如果存在多个子节点,其实是
				//一种异常情况,此处我们只做校验,冲突有sync解决
				for(String id : children){
					String tpath = alivePath + "/" + id;
					Stat stat = new Stat();
					byte[] data = zkClient.getData(tpath, false,stat);
					long time = Long.valueOf(new String(data));
					long current = System.currentTimeMillis();
					//如果一个任务,它的执行者在2分钟内都没有和zk交互(synSelfWorker方法中会更新time)
					//表明已经过期
					//为了便于测试,此处为15秒
					if(time + 1500 < current){
						try{
							zkClient.delete(tpath, stat.getVersion());
						}catch(BadVersionException e){
							//ignore
						}catch(NoNodeException e){
							//ignore;
						}
					}else{
						System.out.println(id + " :" + job);
					}
				}
			}catch(Exception e){
				e.printStackTrace();
			}
		}
		lock.unlock();
	}
	/**
	 * 任务同步线程,间歇性的检测zk持有的任务和本地任务是否一致
	 * 并负责分配任务
	 * @author qing
	 *
	 */
	class SyncHandler implements Runnable {

		public void run() {
			try {
				int i = 0;
				int l = 10;
				while (true) {
					synchronized (tag) {
						try{
							while(!scheduler.isStarted()){
								tag.wait();
							}
						}catch(Exception e){
							//
						}
					}
					System.out.println("Sync handler,running...tid: " + Thread.currentThread().getId());
					if (zkClient == null || (zkClient.getState() == States.NOT_CONNECTED || zkClient.getState() == States.CLOSED)) {
						lock.lock();
						try {
							// 回话重建等异常行为
							zkClient = new ZooKeeper(Constants.connectString, 3000, dw, false);
							System.out.println("Reconnected success!...");
						} catch (Exception e) {
							e.printStackTrace();
							i++;
							Thread.sleep(3000 + i * l);// 在zk环境异常情况下,每3秒重试一次
						} finally {
							lock.unlock();
						}
						continue;
					}
					if (zkClient.getState().isConnected()) {
						sync();//同步任务
						scheduler();//任务分配和过期检测
						Thread.sleep(3000);// 如果被“中断”,直接退出
						i = 0;
					}else{
						Thread.sleep(3000);
					}
				}
			} catch (InterruptedException e) {
				System.out.println("SID:" + sid + ",SyncHandler Exit...");
				close();
			}

		}
	}
	
	/**
	 * 调用者提交的任务,将会被同步的方式交付给zk。此线程就是负责从queue中获取调用者
	 * 提交的job,然后依次在zk环境中生成节点数据。
	 * @author qing
	 *
	 */
	class WorkerHandler implements Runnable{
		private Set<Worker> pending = new HashSet<Worker>();
		private int count = 0;//max = 20;
		
		/**
		 * 将worker信息生成zk节点数据
		 * @param worker
		 * @return
		 */
		private boolean register(Worker worker){
			lock.lock();
			//逐级创建其父节点
			String jobName = worker.getJob().getKey().getName();
			try{
				Transaction tx = zkClient.transaction();//使用事务的方式
				String jobPath = "/" + serverType + "/" + jobName;
				if(zkClient.exists(jobPath, false) == null){
					tx.create(jobPath, worker.getCronExpression().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				String registerPath = "/" + serverType + "/" + jobName+ REGISTER;
				if(zkClient.exists(registerPath, false) == null){
					tx.create(registerPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				String alivePath = "/" + serverType + "/" + jobName+ ALIVE;
				if(zkClient.exists(alivePath, false) == null){
					tx.create(alivePath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				}
				tx.create(registerPath + "/" + sid, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				tx.commit();
			}catch(NodeExistsException e){
				//ignore
			}catch(Exception e){
				e.printStackTrace();
				pending.add(worker);
				//对于异常数据,添加到一个补充操作队列,如果在操作中出现异常,那么将会在
				//补充操作中得到再次校验
			}
			lock.unlock();
			return true;
		}
		
		public void run(){
			try{
				while(true){
					synchronized (tag) {
						try{
							while(!scheduler.isStarted()){
								tag.wait();
							}
						}catch(Exception e){
							//
						}
					}
					System.out.println("Worker handler,running...");
					if(zkClient != null && zkClient.getState().isConnected()){
						System.out.println("Register...");
						Worker worker = outgoingWorker.take();
						register(worker);
						if(!pending.isEmpty()){
							Thread.sleep(500);
							Iterator<Worker> it = pending.iterator();
							while(it.hasNext()){
								boolean isOk = register(it.next());
								if(!isOk){
									count++;
									Thread.sleep(1000);
								}else{
									count = 0;
									it.remove();
								}
								//如果重试20次,仍无法成功,直接抛弃,非常遗憾
								if(count > 20){
									pending.clear();
								}
							}
						}
						
					}else{
						Thread.sleep(1000);
					}
				}
				
			}catch(InterruptedException e){
				System.out.println("SID:" + sid + ",WorkerHandler Exit...");
				close();
			}
		}
	}
	
	/**
	 * 全部删除当前serverType下所有的任务
	 */
	public void clear(){
		lock.lock();
		try{
			if(zkClient != null && zkClient.getState().isConnected()){
				zkClient.delete("/" + serverType, -1);
			}
//			if(scheduler != null && scheduler.isStarted()){
//				for(Worker worker : selfWorkers.values()){
//					scheduler.unscheduleJob(worker.getTrigger().getKey());
//				}
//			}
//			allWorkers.clear();
		}catch(Exception e){
			e.printStackTrace();
		}finally{
			lock.unlock();
		}
	}

}

 

其他辅助类,请参考附件中的源码,谢谢。

分享到:
评论
2 楼 QING____ 2015-06-01  
cpusoft 写道
是否可以考虑用zk的选举机制呢,先选举一个leader,然后这个leader执行实际的业务。
参见
http://www.infoq.com/cn/news/2014/06/qunaer-task-dispatching-center


原则上可以,其中apache 的curator提供了较好的解决方案,建议参考。
1 楼 cpusoft 2015-05-31  
是否可以考虑用zk的选举机制呢,先选举一个leader,然后这个leader执行实际的业务。
参见
http://www.infoq.com/cn/news/2014/06/qunaer-task-dispatching-center

相关推荐

    zookeeper实战:ConfigServer代码样例

    《Zookeeper实战:ConfigServer代码样例解析》 在分布式系统中,Zookeeper作为一个高可用的分布式协调服务,被广泛应用于配置管理、命名服务、分布式锁等场景。本篇文章将聚焦于Zookeeper的一个典型应用——Config...

    开源监控管理zookeeper工具:exhibitor-1.6

    Exhibitor是一款强大的开源工具,专门用于管理和监控ZooKeeper集群。在Zookeeper的应用场景中,确保服务的稳定性和高可用性至关重要,而Exhibitor则为此提供了直观且易用的界面。它允许用户轻松地配置、管理和监视...

    ZooKeeper-分布式过程协同技术详解 和从Paxos到Zookeeper

    《ZooKeeper:分布式过程协同技术详解》与《从Paxos到Zookeeper:分布式一致性原理与实践》这两本书深入探讨了分布式系统中的关键组件ZooKeeper及其背后的一致性算法Paxos。ZooKeeper是由Apache软件基金会开发的一个...

    知识领域:JAVA 技术关键词:Dubbo 内容关键词:ZooKeeper 用途:分布式系统的可靠协调系统

    -- 此rar文件包含两个ZooKeeper版本 一个3.4.6、一个... ZooKeeper代码版本中,提供了分布式独享锁、选举、队列的接口,代码在zookeeper-3.4.3\src\recipes。其中分布锁和队列有Java和C两个版本,选举只有Java版本。

    读书笔记:《netty+redis+zookeeper实战》附带代码.zip

    读书笔记:《netty+redis+zookeeper实战》附带代码

    Zookeeper实战

    《Zookeeper实战详解》 Apache ZooKeeper是一款分布式协调服务,它是集群管理的基石,用于解决分布式环境中的命名、配置管理、分布式同步等问题。本篇将深入探讨Zookeeper的实战应用,包括安装配置、集群搭建以及...

    zookeeper集群

    1. 安装ZooKeeper:首先需要下载ZooKeeper的安装包,例如zookeeper-3.4.9.tar.gz,然后将其解压到指定的安装目录中。 2. 配置ZooKeeper:在ZooKeeper的配置文件zoo.cfg中,需要修改数据目录、端口号、服务器列表等...

    从Paxos到Zookeeper:分布式一致性原理与实践

    从Paxos到Zookeeper:分布式一致性原理与实践,适合分布式系统各阶段学习,并对分布式架构有深入的理解与提高

    zookeeper实战

    Apache ZooKeeper是一个开源的分布式协调服务,它被设计用来管理大规模分布式系统的协调问题,如配置管理、命名、同步、群集状态管理和服务发现等。ZooKeeper以一种简单的方式维护配置信息、命名、提供分布式锁和...

    深入探索Zookeeper:实战应用与高效策略

    **深入探索Zookeeper:实战应用与高效策略** Zookeeper是一个分布式的、开放源码的分布式应用程序协调服务,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终将简单易用的...

    ZooKeeper-分布式过程协同技术详解 PDF 高清完整版

    第三部分(第9~10章)主要介绍ZooKeeper内部原理及如何运行ZooKeeper.第9章介绍ZooKeeper的作者们在设计时所采用的方案。第10章介绍如何对ZooKeeper进行配置。 购买地址:https://item.jd.com/11880788.html

    netty-redis-zookeeper高并发实战学习-netty-redis-zookeeper.zip

    在学习"Netty-Redis-Zookeeper高并发实战"的过程中,你可能会涉及到以下知识点: 1. Netty的基本概念和原理,包括其事件循环模型、ByteBuf的使用以及自定义编解码器的实现。 2. Redis的数据结构及其应用场景,如...

    【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper.zip

    【书籍学习】Netty、Redis、Zookeeper高并发实战-netty-redis-zookeeper # netty-redis-zookeeper 【书籍学习】Netty、Redis、Zookeeper高并发实战

    使用ansible安装zookeeper

    本教程将详细讲解如何使用Ansible来安装Zookeeper,一个分布式协调服务,常用于管理Apache Hadoop和其他大数据组件。 **一、Ansible简介** Ansible是一款基于Python的自动化工具,它采用无代理的方式工作,通过SSH...

    zookeeper-3.4.14-1.x86_64.rpm for centos7

    zookeeper-3.4.14-1.x86_64.rpm centos7 Default locations binaries: /opt/zookeeper data: /var/lib/zookeeper logs: /var/log/zookeeper configs: /etc/zookeeper, /etc/sysconfig/zookeeper

    高可用之ZooKeeper集群的安装、配置、高可用测试

    高可用之 ZooKeeper 集群的安装、配置、高可用测试 ZooKeeper 是一个广泛使用的分布式协调服务,提供了高可用、可靠的服务注册中心解决方案。高可用之 ZooKeeper 集群的安装、配置、高可用测试是分布式系统架构中...

    ZooKeeper-:ZooKeeper源码剖析

    优秀时间学习了一下ZooKeeper:分布式过程协调这本书的内容,对ZooKeeper实现的细节很好奇,所以顺便把ZooKeeper源码看了一遍。看完之后想写点内容做个笔记,确实发现不好开始。由于ZooKeeper一个完整的逻辑的代码...

    zookeeper系列1:入门

    **Zookeeper系列1:入门** Zookeeper是一款分布式协调服务,由Apache基金会开发,广泛应用于分布式系统中的数据共享、配置管理、命名服务、集群同步等场景。它的设计目标是简化分布式环境下的复杂问题,提供高可用...

    zookeeperDemo:一个演示学习

    - **源代码**:Java源代码实现了与Zookeeper交互的客户端API,展示了如何创建会话、设置/获取节点数据、监听节点变化等操作。 - **配置文件**:包含Zookeeper的连接配置,如服务器地址、端口等。 - **测试用例**...

Global site tag (gtag.js) - Google Analytics