- 浏览: 66553 次
- 性别:
- 来自: 杭州
最新评论
-
westboy172887564:
好久之前的帖子了,不知道还会不会回复啊,有开发文档么?
迟到的开源 -
lieyan2024:
正在学习单例模式, 看了你的文章有收获,有不解.单例模式确实很 ...
架构、框架、模式之轻松掌握设计模式(单例模式) -
raoyutao:
DCS member.
关于Spring -
凤舞凰扬:
raoyutao 写道黄教授,今天在这上面突然发现了你,好欣喜 ...
关于Spring -
raoyutao:
黄教授,今天在这上面突然发现了你,好欣喜呢
关于Spring
昨天上来看看,看到有个童鞋发了篇关于线程池的实现的帖子,也引来了不少讨论。呵呵,初步看了下,那个线程池的设计与实现还是比较初级,并且存在的问题还是蛮多的。刚好,前两者,为一个项目设计实现了一个基于生产-消费者模式的任务异步处理组件,中间用到了线程池,发上来,供一些童鞋学习参考下。
有些童鞋可能会说,在JDK1.5后就带了ExecutorService这样的线程池,干嘛还自己实现啊?这里,我就先简单说一下背景情况和设计的思路先。
1. JDK的ExecutorService中的线程池只是提供了一些基础的实现,进入线程池的任务一般有两种行为:阻塞或者激活新的线程,前者是对于fixedThreadPool而言,而后者是对于cachedTreadPool。而项目需要的是一个具有伸缩性的。这里包括两个方面,一个是伸缩性的工作线程池(可以根据情况对线程池进行自我调节),二是伸缩性的任务队列(具有固定的大小,多余的任务会被暂时转储,而队列空闲至一个阈值时,从恢复转储的任务)。Commons-pool其实实现了第一个需求,但是它在设计上是存在一些问题,并不太适合用于线程池的管理(改篇可以再行讨论)。
2. 对于任务队列好,对于工作线程好,一个具有良好设计组件的前提是还有这么几个需求的:它自身是可以被Audit,也就是它的性能,实际工作质量是可以被检查和评估的;它的行为是可以被扩展的;它必须是健壮的并且可控的。所以,除了生产-消费者的线程池外,还必须有一些管理线程,它们必须能够良好地反馈和控制线程池;其次对于整个组件的活动必须定义一套事件以及事件监听机制,其目标一是能做到组件状态的自省,二是能提供客户端的扩展(比如实现动态任务链等)。
好了,谈了背景,先简单减少一下组件中几个主要角色的构成吧(因为整个组件约有60个类,所以不可能全部说明和贴出来):
1. WorkEngine: 除了继承Switchable这一一个控制开关外,它包括三个主要组成部分和三个扩展部分。
三个组成部分也就是组件的核心:任务队列、工作线程代理、任务结果处理队列。
三个扩展部分分别是:配置、持久化接口以及控制钩子接口。
public interface Switchable { void cancelWork() ; String getId() ; boolean isStartForWork() ; void startWork() ; void stopWork() ; } public interface WorkEngine extends Switchable { void addControlHook(Switchable hook) ; WorkConfiguration getConfiguration() ; Persistence getPersistence() ; TaskReportQueue getReportQueue() ; TaskQueue getTaskQueue() ; WorkerBroker getWorkerBroker() ; }
2. 下面对三个主要的部件接口简单说明下:
首先是任务队列TaskQueue,相对于传统的队列,增加了事件监听器和任务优先级重处理。
public interface TaskQueue extends Iterable<Task> { void addEventListener(TaskEventListener listener) ; /** * add a new task to tail of queue * @param task */ void addTask(Task task) ; /** * check whether existing task in queue. * @return */ boolean existTask() ; /** * sort tasks in queue according to priority */ void sequence() ; /** * remove the task at the head of queue */ Task removeTask() ; /** * remove the indicated task in queue * @param task */ void removeTask(Task task) ; int size() ; int capacity() ; }
其次是工作线程代理(有个不错的名字:包工头)以及工作线程接口(也就是工人):
public interface WorkerBroker { void fireWorker(); void fireWorker(Worker worker); int getIdleWorkers(); int getWorkingWorkers(); int getMaxWorkers(); boolean hireWorker(Worker worker); Worker require(boolean overdue); boolean returnBack(Worker worker); void setWorkerFactory(WorkerFactory factory) ; void addEventListener(WorkerEventListener listener) ; } public interface Worker extends Switchable { boolean isAvailable(); boolean isStartForWork() ; void setTaskEventNotifier(TaskEventNotifier notifier); void setWorkerEventNotifier(WorkerEventNotifier notifier); void work(Task task); int getNumOfExecutedTasks() ; }
最后是任务报告队列,用于处理每个异步任务执行结果的报告
public interface TaskReportQueue extends Iterable<TaskExecutionReport> { void submitReport(TaskExecutionReport report) ; int capacity(); int size() ; }
3. 最后贴出组件配置的接口,这样大家也清晰有哪些是可配置的
public interface WorkConfiguration { /** * when the size of task queue is less than a specific threshold * engine will activate task from persistence. * the default threshold is getTaskQueueCapacity * 0.8. * if return value is 0, will consider to use default setting. * @return */ int getActivateThreshold() ; /** * * @return map key is group name, value is collection of task category */ Map<String, List<String>> getTaskGroupInformation() ; Map<String, Map<String, Class<? extends TaskContextConverter>>> getDefinedConverters() ; /** * the intial size of worker queue * @return */ int getInitNumberofWorkers(); /** * get the interval to check whether existing persistented tasks. * the unit is second. * The default value is 300. * @return */ int getIntervalToCheckPersistence() ; /** * the waitting time to require a worker from broker. * @return */ int getLatencyTimeForWorker(); /** * get apache common logging instance. * @return */ Log getLog(); /** * the size of worker queue * @return */ int getMaxNumberofWorkers(); /** * when found a exception (not user thrown), max retry times. * @return */ int getMaxRetryTimesWhenError() ; /** * the number of result processor thread * @return */ int getNumberofResultProcessors(); /** * 0 is unlimited, 1 is single thread, positive is a fixed thread pool, negative is illegal * @return */ int getNumberofServiceThreads() ; Class<? extends Persistence> getPersistenceClass(); /** * the task will promote the priority after a specific time elapsed. * the time unit is second. * @return specific time */ int getPriorityAdjustmentThreshold() ; /** * the size of task result queue * @return */ int getResultQueueCapacity(); List<Class<? extends TaskEventListener>> getTaskEventListeners(); Map<String, Map<TaskProcessLifeCycle, Class<?>>> getTaskCategoryInformation(); /** * the overdue setting of executing a task * the time unit is second * @return */ int getTaskOverdue(); /** * the size of task queue * @return */ int getTaskQueueCapacity(); List<Class<? extends WorkerEventListener>> getWorkerEventListeners(); /** * whether the system can automatically support the time based priority adjustment of task . * @return */ boolean supportPriorityAdjustment() ; /** * when worker queue is emtpy, whether to use backup worker. * @return */ boolean useBackupWorker(); }
好了,列出上面几个重要的接口,下面也就是大家最关心的实现了。在实现是基于JDK1.5,所以用到了Queue, Atomic等。
首先是TaskQueueImpl
public final class TaskQueueImpl extends ControlSwitcher implements TaskQueue, TaskEventNotifier, WorkerBrokerAware { /* * * * @uml.property name="broker" * @uml.associationEnd */ private WorkerBroker broker; private int capacity ; /* * * * @uml.property name="chain" * @uml.associationEnd */ private TaskEventListenerChain chain = new TaskEventListenerChain(); private Log logger ; private BlockingQueue<Task> queue; private int threshold ; private long lastCheckTime ; private long intervalToCheck ; private AtomicBoolean existPersistentTask = new AtomicBoolean(false); public TaskQueueImpl() { super(); } void activateTasks(int size) { if (!existPersistentTask.get()) { if (lastCheckTime <= 0L || System.currentTimeMillis() - lastCheckTime <= intervalToCheck) { return ; } } logger.info("the queue is available, expect to activate ["+size+"] tasks."); lastCheckTime = System.currentTimeMillis() ; Collection<Task> tasks = getEngine().getPersistence().activate(size); logger.info("actual activated "+tasks.size()+" tasks."); for (Task task : tasks) { putInQueue(task); } if (tasks.size() < size) { existPersistentTask.set(false); } } public void addEventListener(TaskEventListener listener) { chain.addListener(listener); } public void addTask(Task task) { if (queue.size() > capacity) { passivateTask(task); return; } putInQueue(task); } private void cancelTask(Task task) { task.setState(TaskState.OUT_OF_QUEUE); chain.onEvent(task, TaskEvent.CANCEL); } public boolean existTask() { return queue.size() > 0; } @Override public Runnable getManagementThread(final WorkConfiguration config) { final TaskEventNotifier notifier = this ; final WorkerBroker broker = this.broker ; final Log logger = this.logger ; final BlockingQueue<Task> queue = this.queue ; final int capacity = this.capacity ; final int threshold = this.threshold ; return new Runnable() { public void run() { while (isStartForWork()) { Worker worker = broker.require(true); if (worker == null) { logger.warn("can not require any worker in specific time."); worker = broker.require(false); } if (logger.isDebugEnabled()) { logger.info("required a worker[id="+worker.getId()+"]."); } Task task = null ; try { if (queue.size() <= threshold) { activateTasks(capacity - threshold); } task = queue.take(); } catch (InterruptedException e) { logger.warn("The task queue is interrupted, engine is still start for work[" + isStartForWork() + "]."); } if (task == null) { logger.warn("no found any task in queue in specific time, will return back worker."); broker.returnBack(worker); continue ; } task.setOutQueueTime(DateFormatter.now()); try { if (!worker.isStartForWork()) { if (logger.isDebugEnabled()) { logger.info("start worker[id="+worker.getId()+"]"); } worker.startWork() ; } worker.setTaskEventNotifier(notifier); worker.work(task); } catch (Throwable th) { logger.warn("found exception when the worker[id="+worker.getId()+"] start to work.",th); broker.returnBack(worker); } } } }; } public Iterator<Task> iterator() { Task[] ts =queue.toArray(new Task[queue.size()]); return Arrays.asList(ts).iterator() ; } public void notifyTaskEvent(Task task, TaskEvent event) { if (event == TaskEvent.FAIL) { int max = getConfiguration().getMaxRetryTimesWhenError(); int current = task.getRetriedTimes(); if (max > current) { task.setRetriedTimes(current+1); addTask(task); } else { logger.warn("the task " + task + " has retried " + current + " times, will be skipped."); } } chain.onEvent(task, event); } private void passivateTask(Task task) { getEngine().getPersistence().passivate(task); existPersistentTask.set(true) ; task.setState(TaskState.OUT_OF_QUEUE); chain.onEvent(task, TaskEvent.PERSISTENT); } public void postInitial() { capacity = getConfiguration().getTaskQueueCapacity(); threshold = getConfiguration().getActivateThreshold(); if (threshold <= 0 || threshold >= capacity) { Double dou = Double.valueOf(capacity * 0.8); threshold = dou.intValue() ; } queue = new PriorityBlockingQueue<Task>(capacity, new PriorityBasedComparator()); chain.setService(getEngine().getExecutorService()); logger = getConfiguration().getLog() ; intervalToCheck = getConfiguration().getIntervalToCheckPersistence() * 1000 ; } void putInQueue(Task task) { task.setInQueueTime(DateFormatter.now()); if (queue.offer(task)) { task.setState(TaskState.WAITING); chain.onEvent(task, TaskEvent.IN_QUEUE); } else { logger.warn("fail to put task " + task + " in queue."); } } public Task removeTask() { Task task = queue.poll(); if (task != null) { task.setOutQueueTime(DateFormatter.nowDate()); cancelTask(task); } else { if (logger.isDebugEnabled()) { logger.info("no task in queue."); } } return task ; } public void removeTask(Task task) { if (queue.remove(task)) { task.setOutQueueTime(DateFormatter.nowDate()); cancelTask(task); } else { logger.warn("remove task from queue failed."); } } /* * * * @param broker * @uml.property name="broker" */ public void setBroker(WorkerBroker broker) { this.broker = broker; } public int capacity() { return capacity; } public int size() { return queue.size(); } }
接下来是AbstractWorker 和WorkerBrokerImpl
public abstract class AbstractWorker extends ControlSwitcher implements Worker, WorkerBrokerAware { private Log logger ; /* * * * @uml.property name="taskEventNotifier" * @uml.associationEnd */ private TaskEventNotifier taskEventNotifier ; /* * * * @uml.property name="workerEventNotifier" * @uml.associationEnd */ private WorkerEventNotifier workerEventNotifier ; /* * * * @uml.property name="broker" * @uml.associationEnd */ private WorkerBroker broker ; private final AtomicBoolean available = new AtomicBoolean(true) ; private final AtomicInteger executedTasks = new AtomicInteger(0); protected void doWork(Task task) { available.set(false) ; task.setExecuteTime(DateFormatter.now()); TaskExecutionReport report = new TaskExecutionReport(task); TaskProcessDescriptor desc = getEngine().getTaskCategoryInformation(task.getCategory()); long start = System.currentTimeMillis(); try { if (desc == null) { throw new IllegalArgumentException("the task category["+task.getCategory()+"] doesn't register."); } TaskEventNotifier ten = getTaskEventNotifier(); TaskValidator validator = desc.getValidator(); if (validator != null) { ValidationResult vr = validator.validate(task); if (!vr.isValid()) { InvalidTaskResult defaultResult = new InvalidTaskResult(task, vr) ; report.setResult(defaultResult); getLogger().warn("the task" + task + " is invalid, caused by "+vr.getMessage()); if (ten != null) { ten.notifyTaskEvent(task, TaskEvent.INVALID); } getEngine().getReportQueue().submitReport(report); return ; } } report.setExecuteTime(new Date(start)); TaskResult result = desc.getExecutor().execute(task); report.setResult(result); report.setCost(System.currentTimeMillis() - start); if (ten != null) { if (report.getCost() > getConfiguration().getTaskOverdue() * 1000) { ten.notifyTaskEvent(task, TaskEvent.OVERDUE); } else { ten.notifyTaskEvent(task, TaskEvent.FINISH); } } getEngine().getReportQueue().submitReport(report); } catch (Throwable exp) { handleException(report, exp); } finally { executedTasks.addAndGet(1); available.set(true); } } private void handleException(TaskExecutionReport report, Throwable exp) { Task task = report.getTask() ; TaskEventNotifier ten = getTaskEventNotifier(); UnexpectedTaskResult defaultResult = new UnexpectedTaskResult(task); defaultResult.setException(exp); report.setResult(defaultResult); getLogger().error("found exception when executing task " + task, exp); if (ten != null) { getTaskEventNotifier().notifyTaskEvent(task, TaskEvent.FAIL); } } /* * * * @return * @uml.property name="logger" */ public Log getLogger() { return logger ; } /* * * * @return * @uml.property name="taskEventNotifier" */ TaskEventNotifier getTaskEventNotifier() { return taskEventNotifier; } /* * * * @return * @uml.property name="workerEventNotifier" */ WorkerEventNotifier getWorkerEventNotifier() { return workerEventNotifier; } public void postInitial() { logger = getConfiguration().getLog() ; } /* * * * @param notifier * @uml.property name="taskEventNotifier" */ public void setTaskEventNotifier(TaskEventNotifier notifier) { this.taskEventNotifier = notifier ; } /* * * * @param notifier * @uml.property name="workerEventNotifier" */ public void setWorkerEventNotifier(WorkerEventNotifier notifier) { this.workerEventNotifier = notifier ; } public boolean isAvailable() { return available.get(); } /* * * * @return * @uml.property name="broker" */ public WorkerBroker getBroker() { return broker; } /* * * * @param broker * @uml.property name="broker" */ public void setBroker(WorkerBroker broker) { this.broker = broker; } public int getNumOfExecutedTasks() { return executedTasks.get(); } @Override public synchronized void startWork() { super.startWork(); executedTasks.set(0); } }
public class WorkerBrokerImpl extends ControlSwitcher implements WorkerBroker, WorkerEventNotifier { private AtomicReference<Worker> agent = new AtomicReference<Worker>(); /* * * * @uml.property name="chain" * @uml.associationEnd */ private WorkerEventListenerChain chain = new WorkerEventListenerChain(); /* * * * @uml.property name="factory" * @uml.associationEnd */ private WorkerFactory factory; private AtomicReference<Long> firstAssign = new AtomicReference<Long>(0L); private Log logger ; private BlockingQueue<Worker> queue; private Semaphore sema ; private BlockingQueue<WorkerTracker> workingQueue ; public WorkerBrokerImpl() { super(); } public void addEventListener(WorkerEventListener listener) { chain.addListener(listener); } /* * * @see com.oocllogistics.comp.workengine.worker.WorkerBroker#fireWorker() */ public void fireWorker() { int balance = queue.size() ; int using = sema.getQueueLength(); int initWorkers = getConfiguration().getInitNumberofWorkers(); if (balance + using > initWorkers && using < initWorkers) { int fireSize = balance + using - initWorkers ; for (int i = 0; i < fireSize; i++) { Worker worker = queue.poll(); if (worker == null) { break; } if (logger.isDebugEnabled()) { logger.info("fire a worker[id="+worker.getId()+"] from queue."); } if (worker.isStartForWork()) { logger.info("stop work of worker[id="+worker.getId()+"]."); worker.stopWork(); } chain.onEvent(worker, WorkerEvent.FIRE); } } } public void fireWorker(Worker worker) { if (worker == null) { return ; } queue.remove(worker); removeFromWorkingQueue(worker); if (logger.isDebugEnabled()) { logger.info("fire a worker[id="+worker.getId()+"] from queue."); } if (worker.isStartForWork()) { logger.info("stop work of worker[id="+worker.getId()+"]."); worker.stopWork(); } chain.onEvent(worker, WorkerEvent.FIRE); } /* * * @see com.oocllogistics.comp.workengine.worker.WorkerBroker#getIdleWorkers() */ public int getIdleWorkers() { return queue.size(); } @Override public Runnable getManagementThread(final WorkConfiguration config) { final long max = config.getTaskOverdue() * 1000 ; final int initial = getConfiguration().getInitNumberofWorkers() ; final AtomicReference<Long> firstAssign = this.firstAssign ; final BlockingQueue<WorkerTracker> workingQueue = this.workingQueue ; final Log logger = this.logger ; final WorkerEventListenerChain chain = this.chain ; return new Runnable() { public void run() { long lastFlag = System.currentTimeMillis() ; while (isStartForWork()) { try { long current = System.currentTimeMillis() ; if (current - lastFlag > max && getIdleWorkers() > initial) { fireWorker(); lastFlag = System.currentTimeMillis() ; } long startTime = firstAssign.get() ; while (startTime == 0L) { Thread.sleep(max); startTime = firstAssign.get() ; } final long interval = current - firstAssign.get(); if (interval >= max) { WorkerTracker tracker = workingQueue.poll(); if (tracker != null) { Worker worker = tracker.getWorker(); logger.warn("the worker["+worker.getId()+"] is overdue, remove from working queue."); removeFromWorkingQueue(worker); chain.onEvent(worker, WorkerEvent.OVERDUE); } } if (max > interval) { Thread.sleep(max - interval); } } catch (InterruptedException e) { String msg = "the worker broker is interrupted, " + "engine is still start for work[" + isStartForWork() + "]."; logger.warn(msg); } } } }; } /* * * @see com.oocllogistics.comp.workengine.worker.WorkerBroker#getMaxWorkers() */ public int getMaxWorkers() { return getConfiguration().getMaxNumberofWorkers() ; } /* * @see * com.oocllogistics.comp.workengine.worker.WorkerBroker#hireWorker(com.oocllogistics.comp.workengine.worker.Worker) */ public boolean hireWorker(Worker worker) { if (worker == null || !worker.isAvailable() || worker.isStartForWork()) { return false; } if (worker instanceof StandardWorker) { logger.info("hire a new worker[id="+worker.getId()+"] into queue."); return returnBack(worker); } if (worker instanceof SpecialWorker) { if (agent.get() == null) { agent.set(worker); logger.info("hire a special worker[id="+worker.getId()+"] as backup."); return true ; } } return false ; } public void notifyWorkerEvent(Worker worker, WorkerEvent event) { boolean isReturned = false ; switch (event) { case CANCEL_WORK: isReturned = returnBack(worker); break; case OVERDUE: isReturned = returnBack(worker); break; case SUBMIT_TASK: isReturned = returnBack(worker); break; default: isReturned = true ; break; } if (!isReturned) { logger.warn("return back worker[id="+worker.getId()+"] failed."); } chain.onEvent(worker, event); } /* * * @see com.oocllogistics.comp.workengine.impl.ControlSwitcher#postInitial() */ public void postInitial() { int init = getConfiguration().getInitNumberofWorkers(); int total = getConfiguration().getMaxNumberofWorkers(); sema = new Semaphore(total); queue = new ArrayBlockingQueue<Worker>(total); workingQueue = new ArrayBlockingQueue<WorkerTracker>(total); Collection<Worker> workerList = factory.createWorkers(init, WorkType.STANDARD_WORKER); for (Worker worker : workerList) { worker.setWorkerEventNotifier(this); queue.add(worker); } if (getConfiguration().useBackupWorker()) { Worker worker = factory.createWorker(WorkType.AGENT_WORKER); worker.setWorkerEventNotifier(this); agent.set(worker); } logger = getConfiguration().getLog() ; chain.setService(getEngine().getExecutorService()); } void removeFromWorkingQueue(Worker worker) { if (worker == null) { return ; } synchronized (worker) { WorkerTracker tracker = new WorkerTracker(worker, 0L); if (workingQueue.remove(tracker)) { if (tracker != null) { firstAssign.set(tracker.getStartWorkTime()); } else { firstAssign.set(0L); } if (logger.isDebugEnabled()) { logger.info("remove the worker[id="+worker.getId()+"] from working queue."); } } else { logger.warn("failed to remove the worker[id="+worker.getId()+"] from working queue."); } } } /* * * @see com.oocllogistics.comp.workengine.worker.WorkerBroker#require(boolean) */ public Worker require(boolean overdue) { Worker worker = null ; checkWhetherToAddWorker(); try { if (overdue) { worker = queue.poll(getConfiguration().getLatencyTimeForWorker(), TimeUnit.SECONDS); } else { worker = queue.take(); } worker = checkWhetherToUseBackupWorker(worker); sema.acquire() ; WorkerTracker tracker = new WorkerTracker(worker, System.currentTimeMillis()); if (workingQueue.offer(tracker)) { firstAssign.compareAndSet(0L, tracker.getStartWorkTime()); if (logger.isDebugEnabled()) { logger.info("put worker[id="+worker.getId()+"] into working queue."); } } else { String msg = "faile to put worker[id="+worker.getId()+"] into working queue. " + "It might cause a worker missing."; logger.warn(msg); chain.onEvent(worker, WorkerEvent.MISSED); } return worker ; } catch (InterruptedException e) { logger.warn("find interrupted exception when requiring a worker.", e); return agent.get() ; } } private void checkWhetherToAddWorker() { synchronized (queue) { if (queue.size() == 0 && sema.availablePermits() > 0) { int size = sema.availablePermits() >= 2 ? 2 : sema.availablePermits() ; logger.info("the worker queue is empty, will employ "+size+" workers."); Collection<Worker> workerCol = factory.createWorkers(size, WorkType.STANDARD_WORKER); for (Worker wk : workerCol) { wk.setWorkerEventNotifier(this); } queue.addAll(workerCol); } } } private Worker checkWhetherToUseBackupWorker(Worker worker) { if (worker == null) { if (getConfiguration().useBackupWorker()) { logger.warn("can not find any avaliable worker in queue, will use backup worker."); return agent.get() ; } return null ; } return worker ; } /* * @see * com.oocllogistics.comp.workengine.worker.WorkerBroker#returnBack(com.oocllogistics.comp.workengine.worker.Worker) */ public boolean returnBack(Worker worker) { if (worker == null) { return false; } //remove from working queue first. removeFromWorkingQueue(worker); boolean succeed = false ; synchronized (worker) { boolean exist = queue.contains(worker); if (exist) { logger.warn("the worker[id="+worker.getId()+"] is existing in worker queue."); } else { succeed = queue.offer(worker); } } if (succeed) { sema.release(); if (logger.isDebugEnabled()) { logger.info("succeed to put worker[id="+worker.getId()+"] in queue."); } return true ; } logger.warn("return back the worker[id="+worker.getId()+"] failed."); return false ; } public void setWorkerFactory(WorkerFactory factory) { this.factory = factory; } public int getWorkingWorkers() { return workingQueue.size(); } }
最后是TaskReportQueueImpl, 在这里,其实就是使用Java自带的线程池来实现的
public class TaskReportQueueImpl extends ControlSwitcher implements TaskReportQueue { private Log logger ; private BlockingQueue<TaskExecutionReport> queue ; private ExecutorService service ; public TaskReportQueueImpl() { super(); } @Override public Runnable getManagementThread(final WorkConfiguration config) { final Log logger = this.logger ; final BlockingQueue<TaskExecutionReport> queue = this.queue ; final ExecutorService service = this.service ; final TaskChainProcessor processor = new TaskChainProcessor(); processor.setEngine(getEngine()); return new Runnable(){ public void run() { while(isStartForWork()) { try { final TaskExecutionReport report = queue.take(); final String category = report.getTask().getCategory(); report.getTask().setReportTime(DateFormatter.now()); Runnable handler = new Runnable(){ public void run() { TaskProcessDescriptor desc = null ; desc = getEngine().getTaskCategoryInformation(category); if (desc == null) { logger.warn("the task category["+category+"] doesn't register."); return ; } if (StringUtils.isNotEmpty(report.getTask().getGroup())) { processor.process(report); } desc.getResultProcessor().process(report); } }; service.execute(handler); } catch (InterruptedException e) { String msg = "the task report queue is interrupted, " + "engine is still start for work["+isStartForWork()+"]."; logger.warn(msg); } } } }; } public void postInitial() { WorkConfiguration config = getConfiguration() ; queue = new ArrayBlockingQueue<TaskExecutionReport>(config.getResultQueueCapacity()); service = getEngine().getExecutorService() ; logger = getConfiguration().getLog() ; } /* * @see * com.oocllogistics.domestic.common.worktask.TaskReporter#submitReport(com.oocllogistics.domestic.common.worktask * .TaskExecutionReport) */ public void submitReport(TaskExecutionReport report) { if (!queue.offer(report)) { Task task = report.getTask() ; logger.warn("fail to submit the task report " + task); } } public Iterator<TaskExecutionReport> iterator() { TaskExecutionReport[] reports = queue.toArray(new TaskExecutionReport[queue.size()]); return Arrays.asList(reports).iterator(); } public int capacity() { return getConfiguration().getResultQueueCapacity(); } public int size() { return queue.size(); } }
核心的接口和实现贴出来了,希望对那些想学怎么写线程池的童鞋能有所帮助。
测试运行最多的时候是25个客户线程(负责提交任务),20个工作线程(负责处理任务),5个任务结果处理线程以及3个管理线程。
经过测试,下面这些测试用例是可通过的, CPU能保持在5%以下。并且Task在队列中等待和处理时间基本为0.
//SMALLEST -- 500ms //SMALLER -- 1000ms //NORMAL -- 2000ms //BIGGER -- 5000ms //BIGGEST -- 5000ms //test time unit is second public void testStandardPerformanceInMinute() { concurrentTaskThread = 10 ; totalTasks.put(Category.SMALLEST, 400); totalTasks.put(Category.SMALLER, 350); totalTasks.put(Category.NORMAL, 250); testTime = 60 ; testPerformance(); } public void testAdvancePerformanceIn2Minute() { concurrentTaskThread = 20 ; totalTasks.put(Category.SMALLEST, 300); totalTasks.put(Category.SMALLER, 300); totalTasks.put(Category.NORMAL, 200); totalTasks.put(Category.BIGGER, 100); totalTasks.put(Category.BIGGEST, 100); testTime = 120 ; testPerformance(); } public void testPerformanceUnderHugeData() { concurrentTaskThread = 25 ; totalTasks.put(Category.SMALLEST, 1500); totalTasks.put(Category.SMALLER, 500); totalTasks.put(Category.NORMAL, 500); testTime = 120 ; testPerformance(); }
欢迎大家讨论....
评论
1. JDK的ExecutorService中的线程池只是提供了一些基础的实现,进入线程池的任务一般有两种行为:阻塞或者激活新的线程,前者是对于 fixedThreadPool而言,而后者是对于cachedTreadPool。而项目需要的是一个具有伸缩性的。这里包括两个方面,一个是伸缩性的工作线程池(可以根据情况对线程池进行自我调节),二是伸缩性的任务队列(具有固定的大小,多余的任务会被暂时转储,而队列空闲至一个阈值时,从恢复转储的任务)。Commons-pool其实实现了第一个需求,但是它在设计上是存在一些问题,并不太适合用于线程池的管理(改篇可以再行讨论)。
2. 对于任务队列好,对于工作线程好,一个具有良好设计组件的前提是还有这么几个需求的:它自身是可以被Audit,也就是它的性能,实际工作质量是可以被检查和评估的;它的行为是可以被扩展的;它必须是健壮的并且可控的。所以,除了生产-消费者的线程池外,还必须有一些管理线程,它们必须能够良好地反馈和控制线程池;其次对于整个组件的活动必须定义一套事件以及事件监听机制,其目标一是能做到组件状态的自省,二是能提供客户端的扩展(比如实现动态任务链等)。
不要用Executors直接用ThreadPoolExecutor
继承一下可以实现统计的需求
然后再定制一下参数
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
是说我呢?还是说12楼的童鞋啊? 12楼的童鞋确实是夸张了....
因为现在做的是公司内部的组件,不能直接提供。我和几个朋友打算整理出一些东西做成开源的组件,有兴趣可以参考使用。
没错是响应,如果包括业务处理那是不可能的(这里的响应是指 服务器接受请求并且返回一个任务令牌给客户端)
说实话也只有AIX的小型机能做到这点 但是AIX的小型机器对BYTE的高低位读取和其他操作系统有区别 所以在AIX下用字符(不传BYTE传字符) 这点可能对性能有一定的影响 其他的操作系统很难做到 吞吐几千万的 如果用WIN 大概没秒最多也就3W(短连接情况下) 超过3W的时候 一般WIN会拒绝连接 信道回收严重不足 在LINUX下可以改下配置 达到每秒十几到几十W左右 但是要改配置(2.6的内核才行)
关于数据库的批量操作 确实是头疼的问题,要具体分析 特别是在高并非的情况下.
关于你的第七楼的回复,我明天详细给你回复。 不过没小时几千万的请求,平均每秒有将近3w的请求,如果一个请求在100ns,等于有3000个并发线程,理论上来说在单机上也是存在的,不过绝对保证不是不是完整的消息处理(只能说是响应)。(我们用TIBCO的EMS,在几百万的小型机上也就是跑能几百万的消息处理)
1.说句实话 要做到线程和业务的完全分离不太可能 比如每个线程要进行1次数据库操作 每次操作时间为1秒 你同时并发100个线程 你就要同时拥有100个连接 这时候批量很重要 实践证明我们把数据库操作进行委托后 吞吐率是几何数的增长。
2.如果业务有重试的操作 那么是否考虑在队列中的任务有会被重复执行的情况 ,比如 A任务失败, 业务把A放回 队列 --》 A在队列中等待 。 以此类推 如果队列中同时存在大于或等于2个以上A任务的时候 是否存在A会被重复的正确执行。
综上,个人认为线程池是死的 ,但是线程池的工作模式是活的。你可以让所有工人在包工头的控制下进行工作 也可以让所以工人按照一定的规则进行存活。前者一切可控,后者灵活多变。前者可能要颁发一般宪法 后者也许只要简单约定。
1. 根据资源情况调节工作线程的数量,合理利用资源。这在业务系统中非常重要。
2. 管理超时或者异常的工作线程(当某个工作线程执行任务超过一段时间,可以异常处理甚至强制回收),防止资源泄漏。
3. 工作线程池的代理,对于引擎来说,它所关心的是需要工人来处理任务,它丝毫不关心工人来自哪里,从何而来,执行完任务后又如何重新使用。
就是 包工头--》工人--》包工头 和 包工头--》包工头--》 一直包工头模式的区别。
程池模式一般分为两种:L/F领导者与跟随者模式、HS/HA半同步/半异步模式。
HS/HA 半同步/ 半异步模式 :分为三层,同步层、队列层、异步层,又称为生产者消费者模式,主线程处理I/O事件并解析然后再往队列丢数据,然后消费者读出数据进行应用逻辑处理;
优点:简化编程将低层的异步I/O和高层同步应用服务分离,且没有降低低层服务性能。集中层间通信。
缺点:需要线程间传输数据,因此而带来的动态内存分配,数据拷贝,语境切换带来开销。高层服务不可能从底层异步服务效率中获益。
L/F 领导者跟随者模式 :在LF线程池中,线程可处在3种线程状态之一: leader、follower或processor。处于leader状态的线程负责监听网络端口,当有消息到达时,该线程负责消息分离,并从处于 follower状态中的线程中按照某种机制如FIFO或基于优先级等选出一个来当新的leader,然后将自己设置为processor状态去分配和处理该事件。处理完毕后线程将自身的状态设置为follower状态去等待重新成为leader。在整个线程池中同一时刻只有一个线程可以处于leader 状态,这保证了同一事件不会被多个线程重复处理。
缺点:实现复杂性和缺乏灵活性;
优点:增强了CPU高速缓存相似性,消除了动态内存分配和线程间的数据交换。
两种模式性能分析:
L/F模式处理一个消息的时间为多路分离、分配、处理的时间,加上线程管理时间,LF中多个线程共享一个事件源,所以,需要协调它们间的行为,即有同步开销,L/F同步开销仅为申请/释放锁的开销,在LF处理请求过程中并不需要线程上下文切换,但是在线程由follower成为leader时需要进行线程上下文切换,所以当两个请求同时到达时,这种上下文切换会影响第二个请求的处理时间,也会带来一定的上下文开销。
T(L/F)=T(多路分离)+T(分配)+T(处理)+T(同步)+T(上下文)
HS/HA模式监听线程和工作线程间通过一个消息队列来交换数据。这会带来数据传递开销,。同时,监听线程和工作线程都需要去访问消息队列,造成了资源的竞争,需要额外的同步机制来协调他们的行为,包括监听线程获取和释放资源锁,对应的工作线程获取和释放资源锁,以及监听线程在将一个请求放入队列后通知工作线程带来的开销,我们称此为同步开销,HS/HA模式的同步开销大于L/F的同步开销,。一个请求由监听线程负责放入消息队列,但是却由工作线程来处理,所以,每个请求都会造成一次线程上下文切
换,由此带来的开销我们称为上下文开销。
T (H/H)=T(多路分离)+T(分配)+T(处理)+T(同步)+T(数据传递)+T(上下文)
从上面分析可以看出没有并发情况下L/F模式线程池模式性能优于HS/HA模式。
并发性能分析:
T(多路分离)、T(分配):LF和HH中把每一个消息的到来当作一个事件来处理。事件分配所做的工作是在一个事件处理器注册表中为一个事件查找事件处理器。这一步骤花费的时间随着当前注册的事件处理器的个数变化。当线程池接受用户连接请求后会为每一个连接注册一个事件处理器,所有通过该连接发来的请求都将由同一个事件处理器来处理。而事件处理器表采用一个平衡二叉树来实现。因此,事件分配的时间可以认为是随着并发用户数的增大而增大;
T(处理)处理消息和管理线程所需的时间都不受并发用户数的影响。
T(线程管理),多线程带来的线程管理开销只会随着线程池中线程数而变化,相对固定。
LF和HH的吞吐量会随着并发用户数的增加而增加。当并发用户数达到一定数量时,CPU成为系统瓶颈,此后增大并发用户数不仅不能增加并发处理的请求个数,反而会加大多路分离和分配的时间,从而使得系统吞吐量下降。
最佳性能时线程线:
随着线程数的增多吞吐量不断增大,当达到最大值后有一个短暂的保持阶段,此后继续增大线程数反而会使得吞吐量减小。而且当请求类型为计算密集型时线程数对
HH 的吞吐量的影响并不是很明显。原因是HH线程池在增加线程数时线程管理开销也有较大幅度的增加。因此,通过增大线程数来改善系统性能对HH来说并不是一种有效的方法。
1.说句实话 要做到线程和业务的完全分离不太可能 比如每个线程要进行1次数据库操作 每次操作时间为1秒 你同时并发100个线程 你就要同时拥有100个连接 这时候批量很重要 实践证明我们把数据库操作进行委托后 吞吐率是几何数的增长。
2.如果业务有重试的操作 那么是否考虑在队列中的任务有会被重复执行的情况 ,比如 A任务失败, 业务把A放回 队列 --》 A在队列中等待 。 以此类推 如果队列中同时存在大于或等于2个以上A任务的时候 是否存在A会被重复的正确执行。
综上,个人认为线程池是死的 ,但是线程池的工作模式是活的。你可以让所有工人在包工头的控制下进行工作 也可以让所以工人按照一定的规则进行存活。前者一切可控,后者灵活多变。前者可能要颁发一般宪法 后者也许只要简单约定。
1. 它是工作线程池的一个代理,它的存在可以隔离工作线程的实现方式。对于任务队列来说,它丝毫不关心工作线程的具体情况及维护。
2. 它本身就是一个线程池,它管理者一批工作线程。每个工作线程都是独立可重用的。正因为工作线程是独立,那么对于线程池本身来说,是需要一个管理线程来管理和维护的。管理线程与工作线程的分离,可以保证功能的专一性,也同时提高容错性。如果没有管理线程,又如何调度工作线程(包括回收、监听、根据情况创建调节)。
至于你说有多个WorkerBroker,不清楚是否指Broker chain的概念,但如果那样,只是增加了对于任务队列或者WorkEngine来说的复杂度。对于他们来说,是丝毫不需要关心这种细节的。同时,这种关系其实丢弃了作为工作线程的独立性(它必须知道和关联其他的工作线程),这种设计非常容易出现资源泄漏。
最后关于你说的并发,会有多个客户程序提交任务,而多个工作线程又是独立的收取、执行任务,并通知自己是否有效工作(一个工作线程是无法同时执行也不应该同时执行多个任务的,所以必须在任务执行完要反馈到线程池中),这些都是需要同步控制的。
其实关于池的实现或者概念,在java本身的代码中或者commons-pool中都有不错的范例,可以好好看看。
抱歉,我不是针对你的线程才这么说。
水位的保护 我的意思是说线程池能够使用的资源应该有一个水位,其实你已经有了。
和业务松耦合 好的线程模式可以解决这个问题, 线程的借出和归还应该有统一的人操作 不应该涉及要使用线程的业务 所以你一定会提供一个业务的切入点
凤舞凰扬:实不相瞒,你提供的这个线程池只是自己写的练手Sample程序,存在的问题蛮大的,连最起码的并发控制都看不到。我还是建议你好好仔细看看我的代码吧,我想你应该看得到区别的。
好像不存在并发吧,这种线程模型干嘛要并发呢。。。
你的代码设计的不错 但是就是包工头指定工作去跑这段 我觉得还可以优化。。
如果每个人都可以做包工头, 包工头只是指定下一个包工头 那么我为什么又要去做并发控制 不用我控制 它自己就跑的很好了。。。
所以一堆的管理线程其实是带来麻烦的根源 个人觉得最好的线程是不需要人管理的 自己就能够管理自己 每个线程平等 只是元的一个集合而已 你只要给它生存规则
其实你能否考虑下 由包工头去找一个工人做包工头 然后包工头自己去做任务。
状态大概是 包工头(A)--》找一个可用工作做包工头(B)--》包工头(A)跑业务
凤舞凰扬:呵呵,任务队列和工作线程池本身就是分开的。
其实可以优化的是,应该由Engine去管理任务与工作线程的关系,而不是在TaskQueueImpl直接从Broker获取Worker
在我看来线程池几点是值得注意的
1.丢失问题 是线程主动 去跑业务 还是业务去借线程
凤舞凰扬:不会有任何丢失,只是在最初的方案中始终有一个等待的工作线程(Worker), 其实可以稍微改动一下,也就是将取任务放在获取工作线程前。不过先后与否,都不会有任何任务或者工作线程的丢失(如果丢失了,那就是大事了,呵呵)。
2.竞争问题 有了一个任务 线程池是否能决定有谁去做 而不会发生争抢着做的情况
凤舞凰扬:有了一个任务,然后是通过WorkerBroker去require一个Worker,现实的例子就像来了个任务,包工头分配一个工人去做一样。所以不会有任何争抢,当然了,WorkerBroker本身也是一个管理线程。
凤舞凰扬:会有死锁问题么?
凤舞凰扬:你看到任何业务的东东了么?
凤舞凰扬:不太清楚什么叫水位
凤舞凰扬:容错机制还算
凤舞凰扬:.......
凤舞凰扬:所以设计了事件和监听机制
凤舞凰扬:不清楚什么中间结果? 理论上来说,有四个地方可以嵌入数据库的操作,任务的持久化、任务结果的处理,工作线程的事件监听、任务的事件监听。
凤舞凰扬:使用MQ是个不错的主意,不过注意两点:1.在大企业的应用中,任何外购组件(商用的好,免费的好),都必须经过POC(概念验证)。2. 如果没有用EJB,MQ更多的是消息处理引擎,而不是任务处理(这也是MDB存在的价值)。其实我们的实践是系统间的集成通信包括了使用基于JMS的TIBCO EMS,而这个任务组件只是系统内部。当然了,spring的batch也是个值得替代的东东。
凤舞凰扬:数据库的处理对于任务工作池来说是外部配属的东西,不应该在组件本身中考虑。
凤舞凰扬:这个问题好像不是说我的,呵呵
这个线程池简单了点 但是模型不错
http://www.iteye.com/topic/451240
凤舞凰扬:实不相瞒,你提供的这个线程池只是自己写的练手Sample程序,存在的问题蛮大的,连最起码的并发控制都看不到。我还是建议你好好仔细看看我的代码吧,我想你应该看得到区别的。
在我看来线程池几点是值得注意的
1.丢失问题 是线程主动 去跑业务 还是业务去借线程
2.竞争问题 有了一个任务 线程池是否能决定有谁去做 而不会发生争抢着做的情况
3.死锁问题 死信通知很重要 监控这块
4.和业务松耦合
5.水位的保护
6.容错
7.线程的异步和同步
8.反馈机制
9.中间结果等缓存。。例如数据库操作的批量 日志的批量
个人觉得任务队列可以放到MQ或EJB中 放在中间件的组件中总比放在你包含线程池的应用中好
同时非即时性的东西如数据库日志 可以考虑用缓存批量入库
个人总觉得 线程池 最好不要和数据库挂钩 就算你用了数据库连接池 但是数据库的使用会成为线程池处理性能的瓶颈 是否可以考虑将线程池的处理结果缓存起来 通过另外一个线程进行批量操作
胡乱将几句 见笑了。
这个线程池简单了点 但是模型不错
http://www.iteye.com/topic/451240
相关推荐
在实际应用中,生产者-消费者模型可能会结合其他设计模式,如单例模式用于保证缓存对象的唯一性,工厂模式用于创建线程或者任务,代理模式用于添加额外的功能或控制。同时,还需要考虑线程池的管理,避免大量线程的...
TaskDispatcher,一个基于生产者消费者模式的框架,为开发者提供了一种灵活且强大的解决方案。本文将深入探讨TaskDispatcher的设计理念、核心组件以及实际应用场景,旨在帮助读者理解和运用这一工具。 一、生产者...
在这个背景下,基于生产-消费者模式的任务异步线程池设计旨在创建一个更灵活、可扩展且具备审计能力的线程池实现。生产-消费者模型中,生产者负责创建任务并放入任务队列,消费者则负责从队列中取出任务并执行。这种...
综上所述,"消息分发框架(基于JAVA阻塞队列实现、生产者消费者模型)"是一个关键的并发处理组件,通过Java提供的并发工具和设计模式,实现了高效、稳定的消息处理。在实际应用中,需要根据业务需求进行适当的性能...
**定义:**生产者-消费者模式是一种用于解决生产者和消费者之间如何协同工作的设计模式。在这个模式中,生产者负责生成数据并将其放入缓冲区,而消费者则从缓冲区取出数据进行处理。 **关键要素:** - 生产者:负责...
- 生产者消费者模式 - 读写锁模式 - 基于模板方法的设计模式 - **高级主题** - ThreadLocal的工作原理与应用场景 - Fork/Join框架介绍 - CompletableFuture的使用案例 - Stream API与并行流 - 并发安全的...
1. **生产者消费者模式**:这种模式用于处理数据生成和消费的问题。生产者线程负责生成数据,而消费者线程则负责处理这些数据。Java中可以使用`BlockingQueue`来实现这一模式,保证线程间的同步和通信。 2. **守护...
- **生产者-消费者模式**:通过将任务生产者和消费者分离,可以有效地处理大量并发任务。 - **工作窃取模式**:允许空闲线程从其他线程的队列中窃取任务,从而充分利用所有可用资源。 - **管道-过滤器模式**:将任务...
1. **生产者消费者模式**:该模式基于`BlockingQueue`,例如`ArrayBlockingQueue`,用于在生产数据和消费数据的线程之间同步。生产者将数据放入队列,消费者则从队列中取出数据。`offer()`, `take()` 和 `poll()` ...
3. **生产者/消费者模型**:在生产者/消费者模式中,生产者负责产生数据,而消费者则消费这些数据。Active-Object 模式非常适合于这种类型的架构。 #### 五、示例分析 以通信网关为例,网关作为供应商和消费者之间...
1. **生产者-消费者模式**:通过队列解决生产者和消费者之间的耦合问题。生产者负责生成数据并放入队列中,消费者负责从队列中取出数据进行消费。 2. **读写锁模式**:读操作和写操作对数据的访问需求不同,通过...
阻塞队列常用于生产者-消费者模式中,能够实现线程间的协作,保证资源的同步和一致性的操作。 并发队列是支持多线程并发访问的队列。在多线程环境中,多个线程可能会同时对同一个队列进行操作,如同时入队或出队,...
- 生产者消费者模式:这是并发编程中的一个经典模式。生产者负责产生数据并存入阻塞队列,而消费者则从队列中取出数据进行处理。由于阻塞队列的特性,可以保证消费者不会在队列为空时去获取数据,也不会在队列满时...
10. **设计模式**:生产者消费者模式在这里非常适用,生产者(通常是用户界面或其他组件)将任务放入队列,而消费者(后台线程)负责取出并执行任务。 综上所述,这个项目涉及了C#多线程编程的核心概念和技术,包括...
- **生产者消费者模式**:利用`BlockingQueue`实现生产者与消费者的解耦。 - **定时任务**:使用`ScheduledExecutorService`实现定时任务的调度。 #### 七、最佳实践 - **避免过多的线程创建**:频繁地创建和销毁...
1. **生产者消费者模式**:这是一种典型的线程协作模式,用于分离数据的生产和消费。Java中可以使用BlockingQueue(阻塞队列)实现,如ArrayBlockingQueue或LinkedBlockingQueue。生产者将元素放入队列,而消费者则...
本文设计的监控传输子系统采用了半同步/半异步的模式,将网络I/O与磁盘I/O分离,利用任务池进行缓冲,以此缓解网络和磁盘I/O之间的冲突。同时,系统运用线程池动态管理算法,根据任务的闲置情况和系统资源利用状况,...
**4)线程池**:基于C++的生产者消费者模式的并发开发,具体技术运用如下:\ **①线程池底层结构**:线程池创建相当于消费者,队列添加相当于生产者,通过vector维护线程池,通过queue<function<>>维护任务队列;...
- **生产者消费者模式**:使用`BlockingQueue`实现线程间的异步通信,如`ArrayBlockingQueue`、`LinkedBlockingQueue`等。 - **单例模式**:在多线程环境下,确保类只有一个实例,常见的有双重检查锁定(DCL)和...
- 生产者消费者模式:使用`BlockingQueue`实现线程间的协作。 - 状态对象模式:用于管理并发访问的共享状态。 - 管道模式:线程间的通信,如`PipedInputStream`和`PipedOutputStream`。 8. **第八章:并发编程...