浏览 2983 次
精华帖 (0) :: 良好帖 (0) :: 新手帖 (0) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2010-02-02
生产者消费者模式无疑是个重要的架构实现模式... 代码使用例子: new DemoPC().startup(); //============================ import java.util.Arrays; import java.util.List; public class DemoPC extends ProducerConsumerSupport { static class ProductWrapper implements Product { String pid; public ProductWrapper(String pid) { this.pid = pid; } } @Override public void consume(Product product) { System.out.println("正在消费产品【" + product + "】!"); } @Override public void handleConsumerException(Exception e, Product product) { System.err.println("消费产品【" + product + "】失败!原因:" + e.getMessage()); } @Override public void handleProducerException(Exception e) { System.err.println("生产失败..."); } @Override public List<Product> produce() { return Arrays.asList(new Product[] { new ProductWrapper("p1"), new ProductWrapper("p2"), new ProductWrapper("pn") }); } } /** * 对生产者消费者模型的抽象 * * @author gzit-team * */ public abstract class ProducerConsumerSupport { private int sleepIfEmptyProduct = 5 * 1000; // 没有取得产品时,休眠的时间 private int sleepIfExceptionRaise = 60 * 1000;// 如果出现异常工作线程做适当的休眠 private boolean sleepWhenExceptionRaise; // 出现异常时是否进行适当的休眠 private int producerSize = 1;// 生产者个数 private int consumerSize = 1;// 消费者个数 private int queenSize = 10; // 等待队列的大小 // 系统运行的几种状态 public final int STATE_INIT = 0; public final int STATE_PAUSED = 1; public final int STATE_RUNNING = 2; public final int STATE_SHUTDOWN = 3; // 系统的状态 private int state = STATE_INIT; // 空的对象列表,做占位用 public static List<Product> EMPTY_PRODUCT_LIST = new ArrayList<Product>(); // 产品队列 private BlockingQueue<Product> produceQueen; // 生产者,消费者队列 private List<Producer> produerList; private List<Consumer> consumerList; // 生产者 class Producer extends PeriodicalTask { public Producer() { this(0); } public Producer(int period) { super(period); } // 可以在这里处理异常 public void handleException(Exception e) { handleProducerException(e); if (sleepWhenExceptionRaise) { try { Thread.sleep(sleepIfExceptionRaise); } catch (InterruptedException e1) { // ignore } } } @Override protected void doWork() throws Exception { List<Product> products = produce(); for (Product p : products) { // 有可能发生阻塞 produceQueen.put(p); } // 如果生产者取得的数据为空,则适当进行休眠,以免出现狂操作 if (products.isEmpty()) { Thread.sleep(sleepIfEmptyProduct); } } } // 消费者 class Consumer extends PeriodicalTask { private Product curProduct; public Consumer() { this(0); } // 可以在这里处理异常 public void handleException(Exception e) { handleConsumerException(e, this.curProduct); if (sleepWhenExceptionRaise) { try { Thread.sleep(sleepIfExceptionRaise); } catch (InterruptedException e1) { // ignore } } } public Consumer(int period) { super(period); } @Override protected void doWork() throws Exception { // 有可能发生阻塞 Product product = produceQueen.take(); curProduct = product; consume(product); } } // 由子类来实现生成和消费的具体逻辑 public abstract List<Product> produce(); public abstract void consume(Product product); public abstract void handleProducerException(Exception e); public abstract void handleConsumerException(Exception e, Product product); // 启动之前做的事情 public void preStartup() { } public void startup() { if (state == STATE_RUNNING) { throw new IllegalStateException("系统已经处于运行状态,不能够再启动!state=" + this.state); } this.preStartup(); // 创建相关的队列和线程 produceQueen = new LinkedBlockingQueue<Product>(this.queenSize); produerList = new LinkedList<Producer>(); consumerList = new LinkedList<Consumer>(); for (int i = 0; i < this.producerSize; i++) { Producer producer = new Producer(); produerList.add(producer); producer.start(); } for (int i = 0; i < this.consumerSize; i++) { Consumer consumer = new Consumer(); consumerList.add(consumer); consumer.start(); } state = STATE_RUNNING; } // /////////////////////////几个管理接口///////////////////// public void pause() { if (state != STATE_RUNNING) { throw new IllegalStateException("系统不处于运行状态,不能暂停!state=" + this.state); } for (Producer p : produerList) { p.pause(); } for (Consumer c : consumerList) { c.pause(); } state = STATE_PAUSED; } public void resume() { if (state != STATE_PAUSED) { throw new IllegalStateException("系统不处于暂停状态,不能恢复!state=" + this.state); } for (Producer p : produerList) { p.resume(); } for (Consumer c : consumerList) { c.resume(); } state = STATE_RUNNING; } public void shutdown() { if (state != STATE_RUNNING) { throw new IllegalStateException("系统不处于运行状态,不能关闭!state=" + this.state); } for (Producer p : produerList) { p.shutdown(); } for (Consumer c : consumerList) { c.shutdown(); } state = STATE_SHUTDOWN; } // ////////////////////////////////////// public int getProducerSize() { return producerSize; } public void setProducerSize(int producerSize) { this.producerSize = producerSize; } public int getConsumerSize() { return consumerSize; } public void setConsumerSize(int consumerSize) { this.consumerSize = consumerSize; } public int getQueenSize() { return queenSize; } public void setQueenSize(int queenSize) { this.queenSize = queenSize; } public BlockingQueue<Product> getProduceQueen() { return produceQueen; } public void setProduceQueen(BlockingQueue<Product> produceQueen) { this.produceQueen = produceQueen; } public List<Producer> getProduerList() { return produerList; } public void setProduerList(List<Producer> produerList) { this.produerList = produerList; } public List<Consumer> getConsumerList() { return consumerList; } public void setConsumerList(List<Consumer> consumerList) { this.consumerList = consumerList; } public int getSleepIfEmptyProduct() { return sleepIfEmptyProduct; } public void setSleepIfEmptyProduct(int sleepIfEmptyProduct) { this.sleepIfEmptyProduct = sleepIfEmptyProduct; } public int getSleepIfExceptionRaise() { return sleepIfExceptionRaise; } public void setSleepIfExceptionRaise(int sleepIfExceptionRaise) { this.sleepIfExceptionRaise = sleepIfExceptionRaise; } public boolean isSleepWhenExceptionRaise() { return sleepWhenExceptionRaise; } public void setSleepWhenExceptionRaise(boolean sleepWhenExceptionRaise) { this.sleepWhenExceptionRaise = sleepWhenExceptionRaise; } } /** * 定時处理任务 * * @author gzit-team * */ public abstract class PeriodicalTask implements Runnable { protected int period; private AtomicBoolean running = new AtomicBoolean(false); private AtomicBoolean paused = new AtomicBoolean(false); // 定時寫統計結果的線程 private Thread taskThread; private boolean daemon; public PeriodicalTask(int period) { this.period = period; } public void run() { while (running.get() && !paused.get()) { try { Thread.sleep(this.period); this.doWork(); } catch (Exception e) { // 处理异常 try { this.handleException(e); } catch (Exception e1) { // 异常处理再失败,则忽略掉 // 线程不能死... } } } } public void shutdown() { running.set(false); try { this.shutdownHook(); this.taskThread.interrupt(); } catch (Exception e) { // ignore // 必须要关的掉 } } protected void shutdownHook() { } public void pause() { this.paused.set(true); } public void resume() { this.paused.set(false); } public boolean isPaused() { return paused.get(); } public void start() { this.running.set(true); this.taskThread = new Thread(this); this.taskThread.setDaemon(this.daemon); this.taskThread.setName(this.getClass().getSimpleName()); this.taskThread.start(); } // 可以在这里处理异常 public void handleException(Exception e) { } // 子类进行的具体工作 protected abstract void doWork() throws Exception; public boolean isDaemon() { return daemon; } public void setDaemon(boolean daemon) { this.daemon = daemon; } } 声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |