class Car{ private final int id; private boolean engine = false, driveTrain = false, wheels = false; public Car(int idn){ id = idn; } public Car(){ id = -1; } public synchronized int getId(){ return id; } public synchronized void addEngine(){ engine = true; } public synchronized void addDriveTrain(){ driveTrain = true; } public synchronized void addWheels(){ wheels = true; } public synchronized String toString(){ return "Car " +id +"[ engine: " + engine + " driveTrain: " + driveTrain + " wheels: "+ wheels+ " ]"; } } class CarQueue extends LinkedBlockingQueue<Car>{} class ChassisBuilder implements Runnable{ private CarQueue carQueue; private int counter = 0; public ChassisBuilder(CarQueue cq){ carQueue =cq; } public void run(){ try { while(!Thread.interrupted()){ TimeUnit.MILLISECONDS.sleep(500); Car c = new Car(counter++); System.out.println("ChassisBuilder created " +c); // Insert into queue carQueue.put(c); } } catch (InterruptedException e) { System.out.println("Interrupted: ChassisBuilder"); } System.out.println("ChassisBuilder off"); } } class Assembler implements Runnable{ private CarQueue chassisQueue, finishingQueue; private Car car; private CyclicBarrier barrier = new CyclicBarrier(4); private RobotPool robotPool; public Assembler(CarQueue cq, CarQueue fq, RobotPool rp){ chassisQueue = cq; finishingQueue = fq; robotPool = rp; } public Car car(){ return car; } public CyclicBarrier barrier(){ return barrier; } public void run(){ try { while(!Thread.interrupted()){ // Blocks until chassis is available; car = chassisQueue.take(); // Hire robots to perform work; robotPool.hire(EngineRobot.class, this); robotPool.hire(DriveTrainRobot.class, this); robotPool.hire(WheelRobot.class, this); barrier.await(); // Unitl the robots finish // Put car into finishedQueue for futher work finishingQueue.put(car); } } catch (InterruptedException e) { System.out.println("Exiting Assembler via interrupt"); } catch (BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } System.out.println("Assembler off"); } } class Reporter implements Runnable{ private CarQueue carQueue; public Reporter(CarQueue cq){ carQueue = cq; } public void run(){ try { while(!Thread.interrupted()){ System.out.println(carQueue.take()); System.out.println(); } } catch (InterruptedException e) { System.out.println("Exiting Reporter via interrupt"); } System.out.println("Reporter off"); } } abstract class Robot implements Runnable{ private RobotPool pool; public Robot(RobotPool p){ pool = p; } protected Assembler assembler; public Robot assignAssembler(Assembler assembler){ this.assembler = assembler; return this; } private boolean engage = false; public synchronized void engage(){ engage = true; notifyAll(); } // The part of run() that's different for each robot; abstract protected void performService(); public void run(){ try { powerDown(); // Wait unitll needed while(!Thread.interrupted()){ performService(); assembler.barrier().await(); // Synchronized // We're done with that job... powerDown(); } } catch (InterruptedException e) { System.out.println("Exiting "+ this+ " via interrupt"); } catch (BrokenBarrierException e) { // This one we want to know about throw new RuntimeException(e); } System.out.println(this +" off"); } private synchronized void powerDown() throws InterruptedException{ engage = false; assembler = null; // Disconnet from the Assembler; // Put ourserlves back in the available pool; pool.release(this); while(engage == false) // Power down wait(); } @Override public String toString() { return getClass().getName(); } } class EngineRobot extends Robot{ public EngineRobot(RobotPool pool){ super(pool); } protected void performService(){ System.out.println(this +" installing engine");; } } class DriveTrainRobot extends Robot{ public DriveTrainRobot(RobotPool pool){ super(pool); } protected void performService(){ System.out.println(this+" installing DriveTrain");; } } class WheelRobot extends Robot{ public WheelRobot(RobotPool pool){ super(pool); } protected void performService(){ System.out.println(this + " installing Wheels");; } } class RobotPool{ // Quietly prevents identical entries; private Set<Robot> pool = new HashSet<Robot>(); public synchronized void add(Robot r){ pool.add(r); notifyAll(); } public synchronized void hire(Class<? extends Robot> robotType, Assembler d) throws InterruptedException{ for(Robot r : pool) if(r.getClass().equals(robotType)){ pool.remove(r); r.assignAssembler(d); r.engage(); // Power it up to do the task return; } wait(); // None available hire(robotType, d); // Try again, recursively } public synchronized void release(Robot r){ add(r); } } public class CarBuilder { public static void main(String[] args) throws InterruptedException { CarQueue chassisQueue = new CarQueue(), finishingQueue = new CarQueue(); ExecutorService exec = Executors.newCachedThreadPool(); RobotPool robotPool = new RobotPool(); exec.execute(new EngineRobot(robotPool)); exec.execute(new DriveTrainRobot(robotPool)); exec.execute(new WheelRobot(robotPool)); exec.execute(new Assembler(chassisQueue, finishingQueue, robotPool)); exec.execute(new Reporter(finishingQueue)); // Start everything running by producing chassis; exec.execute(new ChassisBuilder(chassisQueue)); TimeUnit.SECONDS.sleep(7); exec.shutdownNow(); } }Car 是经由CarQueue从一个地方传送到另一个地方的, CarQueue是一种LinkedBlockingQueue类型。 ChassisBuilder创建了一个未加修饰的Car, 并把它放到了一个CarQueue中。 Assembler从一个CarQueue中取走Car, 并雇请Robot对其加工。 CyclicBarrier使Assembler等待。直至所有的Robot都完成, 并且在那一时刻它会将Car放置到将离开CarQueue中,然后被传送 到下一个操作。最总的CarQueue的消费者是一个Report对象, 它只打印Car, 以显示所有的任务都已经正确的完成了。 Car将其所有方法都设置成了synchronized的。 正如它所表现出来的那样, 在本例中,这是多余的, 因为在工程的内部, Car是通过队列一动的, 并且在任何时, 只有一个任务能够在某辆车上工作。 基本上,队列可以强制串行化地访问Car。但是这正是你可能会落入的陷阱-- 你可能会说“让我们尝试着通过不对Car类同步来进行优化, 因为看起来 Car在这里并不需要同步。 但是当这个系统连接到另一个需要Car被同步 的系统时,它就会崩溃。 // 即Car可能会被多个线程使用,因此我们需要以明显的方式使其成为线程安全的
