之前在学习erlang OTP的时候,看到在OTP中实现了工人-监工模式,就是在定义一个工作者进程的时候,同时为器分配了一个overseer(监工),监工啥事儿也不作,就专门负责工作者进程是否正常工作,有无任务异常情况发生,当时看到在机制觉得不以为然觉得就这么几行代码没什么大不了的。
当我最近用java代码来实现分布式编程的时候发现要做一个稳定的,可靠的系统并不是那么容易,之前在单机系统中默认不会不会出任何问题的共享变量操作,进程间通信。在分布式系统中zookeeper锁操作异常,远程socke长连接断连,一切都变得不是那么可靠了,甚至要默许这种异常成为常态。
但是系统还要保证是4个9的稳定性,按照以前的策略是,发生异常的地方,进行函数重试等,于是乎系统的代码开始变得冗余,杂乱起来。
erlang OTP的策略是,默认异常是一种常态,出问题了没有关系,只要将整个进程重启,重启的逻辑由监工进程负责,这样对于工作进程内部执行的逻辑还是高内聚的,清晰的。初始化的时候还可以设置监工进程的重启规则,比如一分钟之内如果超过5次异常,就认为系统中的崩溃啦,就会执行终止业务进程的逻辑。
按照这个逻辑,我在java中也简单实现了监工-工人模式,代码如下:
import java.util.concurrent.ExecutorService; import java.uil.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** * 工人和监工模式实现 */ public class TaskOverseer { private final OverSeer oversee; private final long timeInterval; private static final ExecutorService threadPool = Executors .newCachedThreadPool(); private int maxErrorCount; public TaskOverseer(long duration, TimeUnit timeunit, int maxErrorCount) { super(); this.maxErrorCount = maxErrorCount; this.timeInterval = timeunit.toMillis(duration); this.oversee = new OverSeer(); this.oversee.setDaemon(true); this.oversee.start(); } private final AtomicInteger count = new AtomicInteger(); private final AtomicLong timestampe = new AtomicLong(System .currentTimeMillis()); protected void startWork() throws Exception { } protected void recycleResource() { } private class OverSeer extends Thread { public OverSeer() { super("Overseer thhread"); } @Override public void run() { try { while (true) { synchronized (this) { System.out.println("launch a worker task"); threadPool.execute(new Worker()); this.wait(); } } } catch (InterruptedException e) { throw new RuntimeException(e); } } } private class Worker implements Runnable { @Override public void run() { try { startWork(); } catch (Throwable e) { synchronized (oversee) { recycleResource(); final long currentTimestamp = System.currentTimeMillis(); if ((currentTimestamp <= (timestampe.get() + timeInterval)) && count.incrementAndGet() > maxErrorCount) { // 终止监工进程 oversee.interrupt(); throw new RuntimeException("between 1 min " + count.get() + " errors occur", e); } else { if (currentTimestamp > (timestampe.get() + timeInterval)) { count.set(0); timestampe.set(currentTimestamp); } } oversee.notifyAll(); } } } } }
只需继承TaskOverseer类,在构造函数上设置,在多少时间内抛最多多少个异常,如果超过了这个异常数目,系统就会终止。然后按照业务需要覆写startWork这个函数,在里面添加一些业务代码。
例如:
public static void main(String[] args) throws Exception { TaskOverseer overseer = new TaskOverseer(1l, TimeUnit.MINUTES, 4) { @Override protected void startWork() throws Exception { Thread.sleep(1000 * 20); throw new Exception("i am die"); } }; Thread.sleep(10000 * 99); }
按照如上构造函数中定义的,一分钟之内最多发生四次异常,超过这个数目系统就会奔溃。
在startWork方法中会20秒抛出一个异常,一分钟之内最多抛三个异常,所以系统将这样一直运行下去,如果将线程sleep的时间小于15秒的话,系统一段时间之后就会自动奔溃。
如果用这样的策略来处理分布式环境下的异常,相信可以构建出一个更加稳定健壮的系统。