`
assertmyself
  • 浏览: 29567 次
  • 性别: Icon_minigender_1
  • 来自: 南京
文章分类
社区版块
存档分类
最新评论

通用任务分发框架(TaskDispatcher),基于生产者消费者模式

阅读更多
TaskDispatcher:通用的任务分发和处理框架,基于生产者消费者模式,底层使用阻塞队列实现。

如果需要使用生产者消费者 模式,不需要再手写阻塞队列,只需要启动该服务,并写对应的process 就可以了。
除了使用简单外,还增加任务状态维护,处理结果追踪,以及任务处理方式等功能


核心代码如下


服务接口
package com.gbcom.ccsv3.transport.dispatcher;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;


/**
 * 转发器接口。
 * 
 * @author syz
 * @date 2014-9-30,下午02:29:52
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskDispatcherItf
 */
public interface TaskDispatcherItf {
	/**
	 * 新增
	 * @param task TaskBase
	 * @param unique 唯一性队列
	 */
	public void addTask(TaskBase task,boolean unique);
	/**
	 * 新增
	 * @param task TaskBase
	 */
	public void addTask(TaskBase task);
	/**
	 * start
	 */
	public void start();
	/**
	 * stop
	 */
	public void stop();
}




转发器实现
package com.gbcom.ccsv3.transport.dispatcher;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * task 转发器:异步方式,生产者消费者模型 : 基于阻塞队列方式
 * 
 * 是否支持重复添加:默认是允许重复的,unique
 * 
 * 可替换为 框架,后者更方便,且无需关注细节。
 * 
 * 如果需要同步,需提供回调函数
 * @modify add unique ,see method public synchronized void addTask(TaskBase task, boolean keyUnique) 
 * @author syz
 * @date 2014-9-30,下午03:09:12
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskDispatcher
 */
public class TaskDispatcher implements TaskDispatcherItf {
	private static final Logger LOG = Logger.getLogger(TaskDispatcher.class);
	private static final int THREAD_NUM = 40;
	private static final int BLOCK_QUEUE_MAX_SIZE = 10000;
	private static final int BLOCK_QUEUE_CLEAR_SIZE = 2000;

	/**
	 * 线程的执行器
	 */
	private ExecutorService executor = null;

	private TaskProcessManager taskManager = null;

	private boolean isRunning = false;
	/**
	 * 上报Trap消息的队列 :SIZE , 阻塞队列。
	 */
	private BlockingQueue<TaskBase> taskQueue = new LinkedBlockingQueue<TaskBase>(
			BLOCK_QUEUE_MAX_SIZE);
	// 唯一性队列
	private Map<String, TaskBase> keyMap = new HashMap<String, TaskBase>();

	private static class TaskDispatcherHolder {
		private static final TaskDispatcher INSTANCE = new TaskDispatcher();
	}

	/**
	 * 获取单例对象
	 * 
	 * @return TaskDispatcher
	 */
	public static TaskDispatcher getInstance() {
		return TaskDispatcherHolder.INSTANCE;
	}

	private TaskDispatcher() {
		init();
		start();
	}

	private void init() {
		isRunning = false;
		taskManager = new TaskProcessManager();
	}

	@Override
	public synchronized void addTask(TaskBase task, boolean keyUnique) {
		if (!isRunning) {
			LOG
					.error("TaskDispatcher  is not running, the Task below may not process");
		}
		if (LOG.isDebugEnabled()) {
			LOG.debug("add Task to Queue " + task.getTaskName() + " "
					+ task.getType());
		}
		try {
			if (keyUnique) {// 如果需要唯一性校验,TaskKey default is hashcode,allow  define it
				if (keyMap.containsKey(task.getTaskKey())) {
					LOG.info("can not add Task to Queue,the ele exist!!!!"
							+ task.getTaskName() + " , size = "
							+ taskQueue.size());
					return;
				} else {
					keyMap.put(task.getTaskKey(), task);// seize a seat 
				}
			}
			LOG.info("add Task to Queue " + task.getTaskName() + " , size = "
					+ taskQueue.size());
			if (taskQueue.size() >= BLOCK_QUEUE_CLEAR_SIZE) {
				LOG.info(" *****cleart request Task***** trap queue size is more than "
								+ BLOCK_QUEUE_CLEAR_SIZE
								+ ";;  CLEAR BlockingQueue");
				taskQueue.clear();
			}
			taskQueue.put(task);
		} catch (InterruptedException e) {
			LOG.info("/******* add Task InterruptedException*********/");
			LOG.error("add Task to queue interrupted", e);
			LOG.info("/******* add Task InterruptedException  *********/");
		} catch (Exception e) {
			LOG.error("Other Exception  ", e);
		}

	}

	/**
	 * 添加任务
	 * 
	 * @param task
	 *            TaskBase
	 */
	@Override
	public synchronized void addTask(TaskBase task) {
		addTask(task, false);
	}

	/**
	 * 停止
	 */
	@Override
	public void stop() {
		executor.shutdownNow();
		isRunning = false;
	}

	/**
	 * 开始
	 */
	@Override
	public void start() {
		executor = Executors.newCachedThreadPool();
		for (int i = 0; i < THREAD_NUM; i++) {
			executor.execute(new DispatcherTask());
		}
		isRunning = true;
		LOG.info("task Dispatcher task start  , current thread size =  "
				+ THREAD_NUM);

	}

	class DispatcherTask implements Runnable {

		/**
		 * 线程执行方法
		 */
		@Override
		public void run() {
			TaskBase bean = null;
			while (!Thread.currentThread().isInterrupted()) {
				try {
					long begin = System.currentTimeMillis();
					bean = taskQueue.take();
					taskManager.taskProcess(bean);
					LOG.info("process Task  success, thread="
							+ Thread.currentThread().getName()
							+ "  ;spend time :total= "
							+ ((System.currentTimeMillis() - begin) / 1000)
							+ "s  || the queue size is not actually:"
							+ taskQueue.size());
				} catch (InterruptedException e) {
					LOG
							.info("/******* Task Dispatcher  InterruptedException*********/");
					LOG.error("Task Dispatcher thread interrupted ;; tread = "
							+ Thread.currentThread().getName(), e);
					LOG
							.info("/******* Task Dispatcher  InterruptedException*********/");
					Thread.currentThread().interrupt();
					break;
				} catch (Exception e) {
					LOG.error("Task Dispatcher thread exception", e);
					continue;
				} finally {
					// addTask(task,false) keymap has not the key , remove is ok
					keyMap.remove(bean.getTaskKey());// remove the key, pair
														// addTask(task,true);
				}
			}

		}

	}

}



处理器管理器,注册指定任务的处理器
TaskProcessManager
package com.gbcom.ccsv3.transport.dispatcher;

import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.transport.dispatcher.process.CfgTplTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.DBTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.HTTPTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.SNMPTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.TaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.UdpTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.task.CfgTplTask;
import com.gbcom.ccsv3.transport.dispatcher.task.DBTask;
import com.gbcom.ccsv3.transport.dispatcher.task.HTTPTask;
import com.gbcom.ccsv3.transport.dispatcher.task.SNMPTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;
import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskType;

/**
 * Task 处理器管理器。
 * 
 * 
 * @author syz
 * @date 2014-9-30,下午03:07:25
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskProcessManager
 */
public class TaskProcessManager{
	/**
	 * 日志记录器
	 */
	protected static final Logger LOG = Logger.getLogger(TaskProcessManager.class);
	@SuppressWarnings("unchecked")
	private Map<TaskType,TaskProcess> taskProcessors = new HashMap<TaskType,TaskProcess>();

	/**
	 * TaskProcessManager
	 */
	public TaskProcessManager(){
		//注册
		taskProcessors.put(TaskType.UDP, new UdpTaskProcess<UdpTask>());
		taskProcessors.put(TaskType.DB, new DBTaskProcess<DBTask>());
		taskProcessors.put(TaskType.HTTP, new HTTPTaskProcess<HTTPTask>());
		taskProcessors.put(TaskType.SNMP, new SNMPTaskProcess<SNMPTask>());
		taskProcessors.put(TaskType.CFGTPL, new CfgTplTaskProcess<CfgTplTask>());
	}
	
	/**
	 * TaskProcessManager
	 * @param map Map<TaskType,TaskProcess>
	 */
	@SuppressWarnings("unchecked")
	public TaskProcessManager(Map<TaskType,TaskProcess> map){
		this.taskProcessors = map;
	}
	/**
	 * 新增处理任务
	 * @param task TaskBase
	 * @param process TaskProcess
	 */
	@SuppressWarnings("unchecked")
	public void put(TaskBase task, TaskProcess process){
		taskProcessors.put(task.getType(), process);
	}
	/**
	 * 删除指定task 的处理器,,根据类型进行。
	 * @param task TaskBase
	 */
	public void remove(TaskBase task){
		taskProcessors.remove(task.getType());
	}
	
	
	/**
	 * 处理任务
	 * @param task task
	 */
	@SuppressWarnings("unchecked")
	public void taskProcess(TaskBase task) {
		TaskProcess process =taskProcessors.get(task.getType());
		if(process !=null){
			process.process(task);
		}
	}

}

以上是主题流程,以下是一些辅助pojo和处理相关

结果数据
package com.gbcom.ccsv3.transport.dispatcher;

import java.io.Serializable;

/**
 * 返回结果。<code>task</code>
 *  封装了 任务和 操作结果,,
 * @param <T>
 * @author syz
 * @date 2014-9-30,上午11:16:48
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskProResult
 */
public class TaskProResult<T> implements Serializable{

	/**
	 * serialVersionUID: long
	 */
	private static final long serialVersionUID = -6165298377784981884L;
	/**
	 * constructor
	 */
	public TaskProResult() {
		this.successful = true;
	}

	/**
	 * constructor
	 * @param t T
	 */
	public TaskProResult(T t) {
		this.origin = t;
		this.successful = true;
	}


	/**
	 * 构造函数

	 * 
	 * @param origin
	 *            数据源

	 * @param isSuccessful
	 *            数据源处理是否成功

	 * @param throwable
	 *            如果数据源处理失败,那导致失败的异常是什么

	 */
	public TaskProResult(T origin, boolean isSuccessful, Throwable throwable) {
		this.origin = origin;
		this.successful = isSuccessful;
		this.throwable = throwable;
	}
	
	/**
	 * 数据源

	 */
	private T origin;
	
	
	//UDP:Message
	private Object result;

	/**
	 * 数据源处理结果

	 */
	private boolean successful;

	/**
	 * 导致失败的原因,如果数据源处理失败

	 */
	private Throwable throwable;

	private Object extraInfo;

	/**
	 * @return Returns the extraInfo.
	 */
	public Object getExtraInfo() {
		return extraInfo;
	}

	/**
	 * @param extraInfo
	 *            The extraInfo to set.
	 */
	public void setExtraInfo(Object extraInfo) {
		this.extraInfo = extraInfo;
	}


	/**
	 * @return Returns the origin.
	 */
	public T getOrigin() {
		return origin;
	}

	/**
	 * @param origin
	 *            The origin to set.
	 */
	public void setOrigin(T origin) {
		this.origin = origin;
	}

	/**
	 * @return Returns the successful.
	 */
	public boolean isSuccessful() {
		return successful;
	}

	/**
	 * @param successful
	 *            The successful to set.
	 */
	public void setSuccessful(boolean successful) {
		this.successful = successful;
	}

	/**
	 * @return Returns the throwable.
	 */
	public Throwable getThrowable() {
		return throwable;
	}

	/**
	 * @param throwable
	 *            The throwable to set.
	 */
	public void setThrowable(Throwable throwable) {
		this.throwable = throwable;
	}

	/**
	 * 
	 * @param result Object
	 */
	public void setResult(Object result) {
		this.result = result;
	}

	/**
	 * 
	 * @return getResult
	 */
	public Object getResult() {
		return result;
	}

}



事件类型定义
基类
//off checkstyle
package com.gbcom.ccsv3.transport.dispatcher.task;

import java.util.Date;

/**
 * 任务基类  两种状态有用  queue:会执行所有操作
 * 
 * 其它状态 都值执行post方法
 * 
 * @author syz
 * @date 2014-9-30,上午10:02:07
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.task.TaskBase
 */
public class TaskBase {
	public enum TaskState{
		QUEUE(0),RUNNING(1),RUNNED(2),INVALID(3);
		
		private int value;
		TaskState(int i){
			this.setValue(i);
		}
		public TaskState valueOf(int i){
			switch(i){
			case 0:
				return TaskState.QUEUE;
			case 1:
				return TaskState.RUNNING;
			case 2:
				return TaskState.RUNNED;
			case 3:
				return TaskState.INVALID;
				default :
					return TaskState.INVALID;
			}
		}
		public void setValue(int value) {
			this.value = value;
		}
		public int getValue() {
			return value;
		}
	}
	
	
	public enum TaskType{
		DB,UDP,HTTP,SNMP,CFGTPL
	}
	//key,,can unique respons of Object ,,suggess String
	//if you want unique queue ,this is must!!!
	//if not set  the default value is object's hashcode
	//this field can be delete ,the logic remove to TaskDispactcher :public synchronized void addTask(TaskBase task,boolean keyUnique){
	private String taskKey = ""+hashCode();
	
	private String taskName;
	@SuppressWarnings("unchecked")
	private Class clzProc;
	@SuppressWarnings("unchecked")
	private Class postProc;
	@SuppressWarnings("unchecked")
	private Class preProc;
	private Date begin;
	private Date end;
	private TaskState state;
	private TaskType type;
	
	
	public String getTaskKey() {
		return taskKey;
	}
	public void setTaskKey(String tastObj) {
		this.taskKey = tastObj;
	}
	public String getTaskName() {
		return taskName;
	}
	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}
	@SuppressWarnings("unchecked")
	public Class getClzProc() {
		return clzProc;
	}
	@SuppressWarnings("unchecked")
	public void setClzProc(Class clzProc) {
		this.clzProc = clzProc;
	}
	public Date getBegin() {
		return begin;
	}
	public void setBegin(Date begin) {
		this.begin = begin;
	}
	public Date getEnd() {
		return end;
	}
	public void setEnd(Date end) {
		this.end = end;
	}
	public TaskState getState() {
		return state;
	}
	public void setState(TaskState state) {
		this.state = state;
	}
	public TaskType getType() {
		return type;
	}
	public void setType(TaskType type) {
		this.type = type;
	}
	@SuppressWarnings("unchecked")
	public void setPostProc(Class postProc) {
		this.postProc = postProc;
	}
	@SuppressWarnings("unchecked")
	public Class getPostProc() {
		return postProc;
	}
	@SuppressWarnings("unchecked")
	public void setPreProc(Class preProc) {
		this.preProc = preProc;
	}
	@SuppressWarnings("unchecked")
	public Class getPreProc() {
		return preProc;
	}
	
}


一个实现类:upd任务
//off checkstyle
package com.gbcom.ccsv3.transport.dispatcher.task;


/**
 * upd任务
 * 
 * @author syz
 * @date 2014-9-30,上午10:33:37
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.task.UdpTask
 */
public class UdpTask extends TaskBase{
	private OperType operType;
	private String item;
	private String json;
	
	public enum OperType{
		REQUEST,INFOR//不带receive相应
	}
	
	public UdpTask(String item){
		super();
		setType(TaskType.UDP);
		setState(TaskState.QUEUE);
	}
	public UdpTask(String item, OperType operType) {
		super();
		setType(TaskType.UDP);
		setState(TaskState.QUEUE);
		this.item = item;
		this.operType = operType;
	}
	public UdpTask() {
		super();
		setState(TaskState.QUEUE);
		setType(TaskType.UDP);
	}
	
	public String getJson() {
		return json;
	}
	public void setJson(String values) {
		this.json = values;
	}

	
	public String getItem() {
		return item;
	}
	public void setItem(String item) {
		this.item = item;
	}
	public OperType getOperType() {
		return operType;
	}
	public void setOperType(OperType operType) {
		this.operType = operType;
	}


}



处理相关类


接口
package com.gbcom.ccsv3.transport.dispatcher.process;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;

/**
 * 任务处理器
 * 
 * @author syz
 * @date 2014-9-30,上午10:47:39
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.TaskProcess
 */
public interface TaskProcess <T>{
	

	/**
	 * 任务处理接口。
	 * @param task 任务对象
	 * @return 任务处理结果封装
	 */
	public TaskProResult<T> process(T task);

}



抽象类
package com.gbcom.ccsv3.transport.dispatcher.process;

import java.util.Date;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.common.exception.TaskProException;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.process.post.PostProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.post.PreProcess;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState;

/**
 * 抽象事件处理:事件处理器,实现该抽象类的抽象方法,
 * @param <T>
 * @author syz
 * @date 2014-12-4,下午01:20:13
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.AbstractTaskProcess
 */
public abstract class AbstractTaskProcess<T extends TaskBase> implements TaskProcess<T> {
	protected static final Logger LOG = Logger.getLogger(AbstractTaskProcess.class);
	/**
	 * before 方式
	 */
	public abstract void before();
	/**
	 * after 方法
	 */
	public abstract void after();
	
	
	/**
	 * 处理方法
	 * @param result result:处理结果的封装,保留source
	 * @throws TaskProException TaskProException
	 */
	public abstract void processCall(TaskProResult<T> result)throws TaskProException;
	
	/**
	 * 
	 * 前置处理
	 * @param task  task
	 * @throws InstantiationException  InstantiationException
	 * @throws IllegalAccessException IllegalAccessException
	 */
	@SuppressWarnings("unchecked")
	private void preProcess(T task) throws InstantiationException, IllegalAccessException{
		Class<PreProcess> pre = task.getPreProc();
		if(pre != null){
			PreProcess obj = pre.newInstance();
			obj.preProcess(task);
		}
	}
	@SuppressWarnings("unchecked")
	private void postProcess(TaskProResult<T>result ) throws InstantiationException, IllegalAccessException{
		Class<PostProcess> post = result.getOrigin().getPostProc();
		if(post != null){
			PostProcess obj = post.newInstance();
			obj.postProcess(result);
		}
	}
	/**
	 * @param task T
	 * @return TaskProResult<T>
	 */
	@Override
	public TaskProResult<T> process(T task) {
		TaskProResult<T> result = new TaskProResult<T>();
		
		result.setOrigin(task);
		try {
			if(task.getState() == TaskState.QUEUE){
				preProcess(task);
			}
		} catch (InstantiationException e) {
			LOG.error("InstantiationException", e);
		} catch (IllegalAccessException e) {
			LOG.error("IllegalAccessException", e);
		}
		
		before();
		try {
			if(task.getState() == TaskState.QUEUE){
				processCall(result);
			}
			task.setState(TaskState.RUNNED);
			task.setEnd(new Date());
			result.setSuccessful(true);
			result.setExtraInfo("success");
			result.setThrowable(null);
		} catch (TaskProException e) {
			LOG.error("TaskProException", e);
			result.setSuccessful(false);
			result.setExtraInfo(e.getClass());
			result.setThrowable(e);
		} catch (Exception e){
			LOG.error("other",e);
			result.setSuccessful(false);
			result.setExtraInfo(e.getClass());
			result.setThrowable(e);
		}
		after();
		try {
			postProcess(result);
		} catch (InstantiationException e) {
			LOG.error("InstantiationException", e);
		} catch (IllegalAccessException e) {
			LOG.error("IllegalAccessException", e);
		}
		
		return result;
	}


}



一个实现:UdpTaskProcess
package com.gbcom.ccsv3.transport.dispatcher.process;

import com.gbcom.ccsv3.common.exception.TaskProException;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * 执行UDP 操作:发送UDP消息,
 * 
 * @param <T>
 * @author syz
 * @date 2014-9-30,下午12:55:45
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.UdpTaskProcess
 */
public class UdpTaskProcess<T extends TaskBase> extends AbstractTaskProcess<T> {
	/**
	 * 后处理
	 */
	@Override
	public void after() {
		LOG.info("UdpTaskProcess  --after---");
	}

	/**
	 * 前处理
	 */
	@Override
	public void before() {
		LOG.info("UdpTaskProcess  --before---");

	}

	/**
	 * 处理
	 * 
	 * @param result
	 *            TaskProResult<T>
	 * @throws TaskProException
	 *             Exception
	 */
	@Override
	public void processCall(TaskProResult<T> result) throws TaskProException {
	}

	/**
	 * 私有snmp辅助类 result == {@link ProcessResult}
	 * 
	 * @author SunYanzheng
	 * @date 2014-9-30,下午12:59:57
	 * @version v1.0.0
	 * @see com.gbcom.smvc.dispatcher.process.UdpTaskHolder
	 */
	@SuppressWarnings("unused")
	private static class UdpTaskHolder {

	}

}


以上 必要的处理模块完毕,,同时可以自定义一些post-action和pre-action的扩展

post-action/pre-action模块
接口
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;


/**
 * 后处理器,,用户自定义自己的处理方法,,在after方法中执行。
 * 
 * @author syz
 * @date 2014-9-30,下午01:36:40
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.PostProcess
 */
public interface PostProcess {
	/**
	 * @param result TaskProResult
	 */
	@SuppressWarnings("unchecked")
	public void postProcess(TaskProResult result);

}


适配器
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import org.apache.log4j.Logger;

/**
 * 默认的后处理 适配器,作为默认处理。
 * 
 * @author syz
 * @date 2014-9-30,下午01:38:53
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.PostProcessAdapter
 */
public class PostProcessAdapter implements PostProcess {
	private static final Logger LOG = Logger.getLogger("PostProcessAdapter");
	@SuppressWarnings("unused")
	private Object process;

	/**
	 * 后处理 适配器
	 * 
	 * @param result
	 *            TaskProResult
	 */
	@SuppressWarnings("unchecked")
	@Override
	public void postProcess(TaskProResult result) {
		LOG.info("result----SUCCESS:" + result.isSuccessful());
		LOG.info("result----RESULT:" + result.getResult());
		LOG.info("result----EXTINFO:" + result.getExtraInfo());
		LOG.info("result----EXCEPTION:" + result.getThrowable());

	}

}



一个实现
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState;
import com.gbcom.ccsv3.util.HttpClientUtil;
import com.gbcom.system.utils.JsonUtil;

/**
 * UDP.INFORM type 后续post接口,,
 * 
 * 
 * 在 controller中使用,, 当云平台 异步访问java模块,当操作完成是,需要调用云平台的回调函数。
 * 
 * 
 * 需要考虑源。
 * 
 * @author syz
 * @date 2014-11-5,下午06:31:59
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.InformPostProcess
 */
public class InformPostProcess implements PostProcess {
	private static final Logger LOG = Logger.getLogger(InformPostProcess.class);

	/**
	 * 心跳报文后处理
	 * 
	 * @param result
	 *            TaskProResult
	 */
	@SuppressWarnings("unchecked")
	@Override
	public void postProcess(TaskProResult result) {

		if (!(result.getOrigin() instanceof UdpTask)) {
			return;
		}
		UdpTask udpTask = (UdpTask) result.getOrigin();
		if (udpTask.getItem().equalsIgnoreCase("forceoffline")) {
			notifyForceOfflien(result);
		}
		if (udpTask.getItem().equalsIgnoreCase("gwebOper")) {
			ignore(result);
		}
	}

	@SuppressWarnings("unchecked")
	private void ignore(TaskProResult result) {
		LOG.info("gwebOper return ,,this in not advance process ,can ignore");
	}

	@SuppressWarnings("unchecked")
	private void notifyForceOfflien(TaskProResult result) {
		UdpTask udpTask = (UdpTask) result.getOrigin();
		String obj = udpTask.getTaskKey();
		String item = udpTask.getItem();// 方法
		String json = udpTask.getJson();// json
		Map<String, String> postmap = new HashMap<String, String>();
		postmap.put("type", item);
		postmap.put("gwid", obj);
		postmap.put("json", json);

		String responseData = HttpClientUtil.post("replace urll", postmap);

		String rst = (String) JsonUtil.jsonToMap(responseData).get("result");

		if (rst != null && rst.equalsIgnoreCase("success")) {

		} else {
			udpTask.setState(TaskState.INVALID);// 无效的task 不会再发送udp报文,但会有post
												// 和pre 也可以有次数限制
			// TaskDispatcher.getInstance().addTask(udpTask);

		}
	}

}


pre-action 跟post类似,仅仅写出接口
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * 前处理器,,用户自定义自己的处理方法,,在after方法中执行。
 * 
 * @author syz
 * @date 2014-9-30,下午01:36:40
 * @version v1.0.0
 * @see PreProcess
 */
public interface PreProcess {
	/**
	 * @param task TaskBase
	 */
	public void preProcess(TaskBase task);
}




客户端使用


自定义事件类型
自定义事件的process
如果需要,扩展pre/post action
开启服务器 就可以工作,,不需要关注线程和阻塞队列。

分享到:
评论

相关推荐

    《鸿蒙操作系统开发入门经典》中1

    在分发这些任务时,开发者可以使用鸿蒙操作系统提供的TaskDispatcher来实现不同的任务分发策略。 TaskDispatcher TaskDispatcher是鸿蒙操作系统提供的一种任务分发器。它可以用来分发不同的任务,并提供了多种实现...

    HarmonyOS线程管理1

    TaskDispatcher 是 HarmonyOS 中的一种任务分发器,用于分发不同的任务。它隐藏任务所在线程的实现细节,提供了多种实现,每种实现对应不同的任务分发器。在分发任务时,可以指定任务的优先级,由同一个任务分发器...

    FileDownloader-master.zip

    2. **TaskDispatcher**:任务调度器根据任务的优先级和当前系统资源进行任务分发,保证任务的有序执行。 3. **FileStore**:负责文件的存储管理,包括临时文件的保存和最终文件的合并。 4. **Database**:存储下载...

    nacos技术分享注册中心和配置中心)

    TaskDispatcher 负责将数据的 key 传入 TaskScheduler 中,并将任务提交到 DataSyncer 中。DataSyncer 负责将数据广播到所有 Server 节点,并将目标 Server 信息设置到 SyncTask 中。 此外,配置中心还提供了多种...

    华为harmonyos公测-HarmonyOS-examples:鸿蒙示例

    华为harmonyos公测 HarmonyOS应用开发资源库 HarmonyOS应用开发的各种示例,项目...:异步任务EventHandler/TaskDispatcher 示例 harmonyos-data Ability:操作本地数据库 示例 harmonyos-service Ability:后台运行任

    lightning:Android App快如闪电的启动器

    一,简介 Android App快如闪电的启动器 二,介绍方式 将JitPack存储库添加到您的内置文件中(项目根目录下build.gradle文件) allprojects { ... TaskDispatcher dispatcher = TaskDispatcher.createInstan

Global site tag (gtag.js) - Google Analytics