`
herman_liu76
  • 浏览: 100300 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

回忆背调业务核心组件的开发

    博客分类:
  • java
阅读更多
    这是我第一次做java技术比较全面和复杂的系统,当时刚从事互联网开发,它与传统单机增删改查的Web应用差别很大,那只是业务复杂。当时除了学习很多工具技巧外包括maven/git使用都才入门,线程处理的相关技术整合使用还不多,自己一个人一下做这个还是蛮有压力的,新的工作需要打响第一P。这时我只看过一部分dubbo与druid源码,有了些想法,最后一步步也顺利完成了。
    开发一个系统其实就是组织一个工厂,安排各种人员,各司其职又相互协作,处理业务的过程。不过后来又看了些源码,比如rocketmq,发现有些相似的思想,也有更多设计显的稚嫩。现在看的多了,更可以参考优秀的设计,无法结构/代码/细节功能都可以向高水平看齐。
    本系统的调查中心有两个核心类,一个是子任务应用注册管理类,类比注册中心,一个是消息处理容器类,内部包括线程池,内部消息处理,重试,持久化,监听连接,选择子任务应用策略类等。另外就是自研协议的通讯中间件处理调用。

    两年前做的系统,有时介绍起来都想不起来细节了,因为很多地方都反复想了多个方案,比如是否按类型分线程池处理等。所以打算总结一下,随便分析一下可以改进的地方。

# 1. 系统的功能与特点


1.1 总体项目概述

​ 公司已经有一个征信产品,是两个单体架构组成,其中一个web应用,将征信请求进入redis队列,另一个单体应用负责从redis中取出请求,进行相关的处理,并将结果写入mongoDb,web应用会轮询结果显示。

​ 本次开发的背调产品,经过总监,CTO,架构师,及我们开发组长们讨论后,相关的思考与决定如下:

  • - ​ SOA架构:为了支撑大并发,高流量的互联网应用,应用拆分为多个细粒度服务,服务要求无状态,接口实现幂等。包括企业服务(背调发起与查询),用户服务(简历与应聘记录),产品服务(背调产品,单次,计次,包月等,单项有按次,按结果收费,是否退费),订单服务,调查服务,支付服务,短信服务等。为了快速搭建整个系统,服务之间通过restful调用,自研通讯组件与协议架构师完善并在我开发的调查中使用。我喜欢用流行的,比较新的技术,但我们提出的使用dubbo没被同意,后来在渠道推广模块使用了。
  • - ​ 服务库:各服务负责自己的数据库,基于mycat的读写分离。订单是最核心的数据,企业是主要的服务对象,用户这边弱化了,毕竟招聘不是重点,可能只操作背调授权。所以服务企业才是重点,订单按企业id分片,仅冗余订单主要信息供其它维度查询,订单id包含企业id,所以详情都走分片查询。
  • - ​    缓存中间件:共用征集的redis缓存,使用了redis的哨兵模式。主要是存放登录相关,如token;存放表级数据,比如用户,产品等数据;存放接口调用缓存,比如最多的查询企业订单列表;存放未完成订单。
  • - ​    消息中间件:有利于应对高并发,有利于多点订阅或者不确定的消费方,也可以延时处理。但暂时没有人手可靠搭建,所以没有使用。于是在调查串行的重点应用中前端要进行限流和有专门线程处理问题订单。


1.2 我负责的业务

​ 企业服务平台中选择调查产品,录入或者选择投递的调查人信息,发起调查开始的所有相关业务。主要有生成订单,扣次数或者扣费后,发起授权。被调查人授权后,提交调查系统处理,根据调查结果进行费用调整。前台轮询处理结果。由于有预付,有授权,所以订单可能会停留在未支付,未授权,调查中等中间状态,其它连续动作由TCC分布式事务保证。另外还有安排相关人员进行订单服务、子调查系统开发,后来又负责企业服务。

​ 本文介绍的只是其中最复杂的调查系统处理。特点:
  • - 类似消息中间件,使用公司自己的通讯组件与协议。
  • - 要实现处理子调查应用的注册,心跳,状态上报,监听,重试,选择策略。
  • - 实现调查消息的持久化、分发,消费确认过程。
  • - 同时要前端限流,以及系统自身的维护线程。
  • - 子应用的处理与状态上报,分有离线人工调查与自动调查。


1.3 本系统介绍

​    系统有点类似于消息中间件的开发,调查服务端(以下简称  SS)接收的消息是一个个背调请求,系统要持久化,每个请求要再要分解出一个个单项调查子任务,推送到各个调查客户端(以下简称  SC)进行消费,及时的调查返回结果,人工调查 SC只返回收到任务。

​    比如用户选择调查一个人的身份/学历/处罚/不良信息。SS接收这个任务,拆解后分别发送给处理身份,学历,处罚,不良信息的四类SC,每类可能不只一个应用,也许有两个调用不良信息的SC都可以完成单独的任务。这个系统有以下几个特点:

  • - SS与SC之间通过公司自己开发的基于netty的通讯组件进行,我参与了这个组件开发,在分析通讯协议设计的文章中介绍了我们的通讯组件。rocketmq中也是自己的通讯组件,自己的专用协议。
  • - 这个过程中还要维护调查客户端的连接,状态,同类的客户端要进行选择性的推送。这个类似于nameServer的功能,有心跳功能,要监听通讯通道的状态,我们集成在SS中,更没有实现高可用。
  • - SS要嵌入到一个web应用中,接收企业服务应用过来的http背调请求,未来内部使用还考虑也支持自己的remote组件,比如相应的企业服务中可以引用背调请求的生产者。当然对外部推广客户还是要http的方式的跨防火墙请求。
  • - 每一个SC也会嵌入一个web应用中,可以查看接收的单项调查子任务的情况。SC要容纳各种子任务的实现,通过配置化只加载其中的一种。SC只处理push过来的子任务,没实现主动pull。
  • - SS收到的消息要支持持久化,提供相应的接口,可以配置具体的方式。
  • - 异步处理为主。比如SC完成的结果后通知SS,SS有监听器异步监听完成情况,进行更新。
  • - 后期由于提出了人工干预同类SC的处理,以及有人工电话调查SC并准备使用dubbo技术,所以又改造了一下,增加了中间处理层(以下简称SM),同类的SC注册到SM中,而SM又注册到SS中。SM也有局部的nameServer的功能,也有业务功能。



# 2. 项目的组成

​ 项目由一个总的pom工程,和三个jar工程组成,分别是survey-client,survey-middle,survey-server组成。



​ 下面以survey-server为主,其它两个模块简单介绍。

# 3. 服务端的设计

## 3.1 总体设计

​ 服务端有两大功能,分别是处理背调请求,维护客户端的状态。有点类似与rocketmq的broker与nameSvr的两大功能。

​ 这两个功能都有核心类,因为因为附属功能都在其中,所以我当时喜欢叫Container,也许现在会学rocketmq叫controller了。两个核心类不需要远程通讯,但它们相互引用。

​ TaskContainer管理任务,AppContainer管理注册的客户端,它们由一个inti类进行统一启动两个核心类,以及注入持久化实现类。同时init类还会启动一个守护进程,输出一些核心类中的重要日志信息给控制台。




## 3.2 通讯中间件使用

基于netty开发。

服务端事先配置可以连接的客户端的信息,包括appKey,code,appSecret等信息。如果客户端连接成功了,会有一个sessionId,由服务端保存。其内部有验证,重连等机制。

通讯层本身有连接心跳功能,但上层还需要一个业务心跳,传递业务执行情况与服务器状态变化 ,上层选择客户端时就可以基于多种策略了。

### 3.2.1 服务端发消息与处理

**服务端的启动:**

```java
//服务端启动,包括端口以及可允许连接的客户端信息。这些信息用于底层校验客户端
ServerInit.init(serverPort, clientAppList);
```


**服务端push消息给客户端:**

一般通讯层服务端会返回连接好的channel给使用者,使用者可以包装成自己的channel进行使用。这里并不提供channel,其内部持有。外部使用如下方式:

```java
//ServerPushHandler是通讯层的类,有静态方法发信息给客户端。
//根据客户端的sessionId发送一个【任务(名称)】以及数据,并设置好返回结果的回调对象来异步处理返回值。
//当然也可以当成dispatcher来用。
boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
```


客户端会注册对应【任务(名称)】的处理类,来处理接收到的数据并返回值。

```java
//客户端配置对应的处理类。
PushReceiverCenter.registReceiver("assignTaskToClient", new PushReceiverFeedbackHandler(apiInvorkerInterface));
```


### 3.2.2 客户端发消息给与服务端处理

**客户端启动:**

```java
//设置状态监听类。底层netty监控到状态变化会通知这个类对象来处理
client = ClientCenter.getAClient(this.serverIP, serverPort, this.appkey, this.appSecret,clientStatusListener);
```


**发送与接收处理:**

```java
//客户端发送业务心跳数据的方式,消息本身包含【消息名称】
//同时还设置了服务端返回值的处理类,确认服务端已经收到了。
MiddleMsg msg = new MiddleMsg("getClientSystemInfo", body);
ResultObject res = client.sendMsg(msg, new ReportClientInfoFeedbackHandler());


//服务端注册消息的处理类,根据【消息名称】,选择对应的处理类
register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler);
```




### 3.2.3 设计改进

​ 在rocketmq中,通讯层与核心类不直接引用,中间有一个outApi的类隔离着,对核心类与其它类提供所有的通讯功能服务。

​ 我的设计中,通讯层属于AppContainer,直接使用了,考虑整体通讯的统一性,应该被一个独立的类被引用使用,独立的类负责通讯,注册消息处理类等功能。



## 3.2 客户端管理核心类的功能

### 3.2.1 属性

​ 客户端管理类,主要有配置的客户端与在线的客户端。由于由服务端与客户端两层改为三层,并且原始的配置值被意义被要求改变,比如appKey由一个客户端变成了一类客户端。还涉及到客户端配置管理服务不能及时变化造成的一定冗余。有些传参被要求变动,还涉及到通讯层的参数变动,所以属性中有些意义发生变化。

​ ClientApp是通讯层要的配置客户端类,SurveyApp是本业务中的客户端。

```java
public class AppContainer {
	
	private static final Logger logger =Logger.getLogger(AppContainer.class);
	private String                           serverIp;
	private String                           serverPort;
	/**用户单例锁*/
	private static Boolean lockSigleton = true; 
	private static AppContainer appContainer;//单例使用
	
	/**配置的app,由于底层没有code,用appKey记录,所以监听底层的Client用AppKey确定。
	//【appKey--(SurveyApp--SurveyClientList)】*/
	public Map<String,SurveyApp> appHolder=new HashMap<String,SurveyApp>();
	
	/**配置的app,任务用Code记录(在任务处理中,因为子任务中使用AppCode指明使用的App类型,这是从产品那边配置的,不太可能用AppKey这个不是很清晰的码)
	 * 【appCode--(SurveyApp--SurveyClientList)】
	 */
	public Map<String,SurveyApp> appCodeHolder=new HashMap<String,SurveyApp>();
	
	/**配置的Client(ClientCode--SurveyClient)*/
	public Map<String,SurveyClient> clientHolder=new HashMap<String,SurveyClient>();
	
	/**可维护的Client在线列表。
	 * <key为appKey(代表一类客户端),内部一组客户端,有并发控制<sessionId,client>>
	 【appKey--(sessionId--onlineClient)】
	 */
	public volatile Map<String, ConcurrentHashMap<String, SurveyClient>> onlineClientMap=new HashMap<String, ConcurrentHashMap<String,SurveyClient>>();
	
	/**根据配置的SurveyClient,生成ClientApp列表,仅用于启动中间件前给底层中间件传递*/
	private List<ClientApp> clientAppList=new ArrayList<ClientApp>();
	
	private TaskContainer taskContainer;//引用任务处理容器
	
	public static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	
	public boolean isMiddlewearStarted=false;
	
```


### 3.2.2 主要方法

- 启动通讯服务端,注册三个业务处理类:客户端状态监听,处理客户端业务心跳,处理客户端完成任务情况上报处理。

```java
    public void initMiddlerWareStart(){
		logger.info("【背调中心】注册客户心跳消息与任务完成消息的回调处理");
		try {
			// 获取应用client基本信息
			MsgServiceHandlerRegister register = MsgServiceHandlerRegister.getRegister();
			//注册事件处理类
			MsgServiceHandlerRegister.setEventHandlerClass(ClientStatusListener.class);
	register.addMsgServiceHandler("getClientSystemInfo",RecvClientInfoHandler.class);
			register.addMsgServiceHandler("subTaskFinishInfo", RecvClientTaskHandler.class);
			new Thread(new Runnable(){
			    @Override
			    public void run() {
			        // TODO Auto-generated method stub
			    	try {
			    		isMiddlewearStarted=true;
			    		ServerInit.init(StringUtils.isEmpty(serverPort)?9166:Integer.parseInt(serverPort), clientAppList);
					} catch (Exception e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
						isMiddlewearStarted=false;
					}
			    }
			}).start();
			logger.debug("【背调中心】启动消息服务成功!端口:" + serverPort);
		} catch (Exception e) {
			isMiddlewearStarted=false;
			e.printStackTrace();
			logger.error("【背调中心】启动消息服务失败!异常:" + e.toString());
		}
    }
```


- ClientStatusListener主要监听客户端登录成功事件,以及客户端断开连接事件。登录成功将产生一个surveyClient,并以sessionId为key存在map中。surveyClient还有一部分业务信息,比如权重等,来自业务心跳给补充。

```java
//public class ClientStatusListener extends AbstractEventHandler中的方法。
//将产生一个surveyClient客户端。
@Override
	public void loginSuccess(EventInfo res) {
		// TODO Auto-generated method stub
		super.loginSuccess(res);
		ContainerInit containerInit = ContainerInit.getInstance();
		if (containerInit != null) {
			ClientApp clientApp = res.getAppinfo();
			Map<String, SurveyClient> surveyClientMap = AppContainer.instance.onlineClientMap.get(clientApp.getAppKey());
			if (surveyClientMap == null)
				surveyClientMap = new ConcurrentHashMap<String, SurveyClient>();
			SurveyApp surveyApp = AppContainer.appHolder.get(clientApp.getAppKey());
			// 1.新建一个调查客户端,以sessionId为key,记录在app表下面。
			// surveyClient中一部分来源上监听,另一部分信息要来源于业务心跳。
			SurveyClient surveyClient = new SurveyClient(clientApp.getIp(), "", clientApp.getSessionId(), clientApp.getChannelId(), surveyApp);
			surveyClientMap.put(clientApp.getSessionId(), surveyClient);
			// 2.新建一个准备放置客户端下的子任务。
			TaskContainer.clientSubTaskInfoMap.put(clientApp.getSessionId(), new ArrayList<SubTaskInfo>());// 新建此客户端下的子任务容器
			logger.info("【中间件状态监听】此APP当前客户端总数:" + surveyClientMap.size());
		}
	}


//public class RecvClientInfoHandler implements MsgServiceHandler中的方法。
//将对surveyClient客户端信息进行补充。包括客户端的类型code,权重,更新时间等,未来增加其它性能数据。
	@Override
	public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
		String body = msg.getBody() + "";
		String sessionId = msg.getHeader().getSessionID();
		String returnCode = "";
		SurveyResponse td = new SurveyResponse();
		try {
			ClientRealData clientRealData = JsonUtils.toBean(body, ClientRealData.class);
			//将客户端的实时信息设置到在线客户端的属性中
			Map<String, SurveyClient> onlineClientMap=(Map<String, SurveyClient>) AppContainer.onlineClientMap.get(clientRealData.getAppkey());
			SurveyClient surveyClient=onlineClientMap.get(sessionId);
//			logger.debug("【处理客户心跳】客户端【存在吗】?:"+surveyClient!=null);
			if(surveyClient!=null){
				logger.debug("【处理客户心跳】clientRealData:"+clientRealData.getClientCode()+"@"+clientRealData.getAppCode());
				surveyClient.setClientCode(clientRealData.getClientCode());
				surveyClient.setUpdateTime(new Date());
				surveyClient.setWeight(clientRealData.getWeight() == null ? "60" : clientRealData.getWeight());
			}
			returnCode = "success";
		} catch (Exception e) {
			e.printStackTrace();
			returnCode = "failure";
			logger.error("【处理客户心跳】消息失败!异常:" + e.toString());
		}
		td.setCode(returnCode);
		msg.setBody(td);
		return msg;
	}
```


  根据业务的类型与企业客户的级别,选择一个可用的客户端,这部分改变比较大,包括构建treemap得到权重与概率与级别要求,同类型还要按已经分配的任务决定给最少任务的。如果正好又不能用了,再递归找一个可用的。

```java
	public static SurveyClient getClientByUserRankAndClinetLever(String appCode,int userRank) {
		// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
		SurveyApp surveyApp = appCodeHolder.get(appCode);
		logger.debug("【策略2】未配置客户端,返回!appKey:"+appCode);
		return getClientByUserRankAndClinetLeverByAppkey(surveyApp.getAppKey(),userRank);
	}
	private static SurveyClient getClientByUserRankAndClinetLeverByAppkey(String appKey,int userRank) {
		// 从appCode得到appKey,从而找到可用的在线客户端.让前端根据appKey来分子任务不可靠
		//SurveyApp surveyApp = appCodeHolder.get(appCode);
		Map<String, SurveyClient> onlineSurveyClientMap = onlineClientMap.get(appKey);
		if(onlineSurveyClientMap==null){
			logger.info("【策略2】未配置客户端,返回!appKey:"+appKey);
			return null;
		}
		
		int onlineClientNum=onlineSurveyClientMap.size();
		logger.debug("【策略2】surveyApp(code|在线客户端数):"+appKey+"|" + onlineClientNum);
		// 用于权重随机的参数对象
		//logger.debug("【策略】可选用客户端的数:" + onlineClientNum);
		Map<String, Integer> canUseClient = new HashMap<String, Integer>();//用于treemap排序。
		List<SurveyClient> sameLeverClientList = new LinkedList<SurveyClient>();//用于同级别客户端存放
		if (onlineClientNum == 0) {
			logger.info("【策略2】备选客户端为0,返回!");
			return null;
		}else {
			Iterator iter = onlineSurveyClientMap.entrySet().iterator();
			while (iter.hasNext()) {
				Map.Entry<String, SurveyClient> entry = (Map.Entry<String, SurveyClient>) iter.next();
				Integer eachWeight = new Integer(entry.getValue().getWeight() == null ? Constants.CLIENT_BASE_LEVER+"" : entry.getValue().getWeight());
				canUseClient.put(entry.getKey(), eachWeight);
				logger.debug("【策略2】循环可选用客户端详细信息(sessionId|weight):" + entry.getKey() + "|" + eachWeight);
				if(onlineClientNum==1) return entry.getValue();
			}
		}
		
		//找出可用的客户端,产生一个map,再构建treemap,用策略得到一个值。
		logger.info("【策略2】准备筛选的客户端个数:"+canUseClient.size());//canUseClient不含有重复的,所以得到选择的客户端值,还要再处理多个同值的情况。
		RankAndLeverTreeMapSelect rankAndLeverTreeMapSelect = new RankAndLeverTreeMapSelect(canUseClient,0);
		Integer chooseValue = rankAndLeverTreeMapSelect.chooseValue();
		logger.debug("【策略2】选择出的客户端lever:"+chooseValue);
		if(chooseValue==null) return null;
		for(SurveyClient surveyClient:onlineSurveyClientMap.values()){
			if(surveyClient.getWeight() == null) surveyClient.setWeight(Constants.CLIENT_BASE_LEVER+"");
			logger.debug("【策略2】当前比对的surveyClient.getWeight(null=60):"+(surveyClient.getWeight()));
			//if(StringUtils.isBlank(surveyClient.getWeight())) continue;//没有就是60分
			if(chooseValue.intValue()==new Integer(surveyClient.getWeight()).intValue()) sameLeverClientList.add(surveyClient);
		}
		if(sameLeverClientList.size()==1) return sameLeverClientList.get(0);
		//如果同一个值的客户端有多个,再排序,取任务最少的一个。
		Collections.sort(sameLeverClientList,new Comparator<SurveyClient>() {
			@Override
			public int compare(SurveyClient surveyClient1, SurveyClient surveyClient2) {
				//以下如果改变顺序则调换一下参数位置
				return surveyClient1.getTaskCount()-(surveyClient2.getTaskCount());
			}
			
		});
		SurveyClient surveyClient=sameLeverClientList.get(0);
		sameLeverClientList.clear();
		
		//如果找到的客户端不能用,就递归找一个,同时移除这个客户端
		SecretManagement m = ServerGlobal.sessionWithAppKeys.get(surveyClient.getSessionId());
		if(m!=null  && m.getChannel()!=null && m.getChannel().isWritable()){
			return surveyClient;
		}
		else{
			Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(appKey);
			if(surveyClientMap.containsKey(surveyClient.getSessionId())){
				logger.debug("【策略。推送失败移除客户端再递归获取】sessionId:"+surveyClient.getSessionId());
				surveyClientMap.remove(surveyClient.getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
			}
			return getClientByUserRankAndClinetLeverByAppkey(appKey,userRank);
		}
	}
```


```java
    /**rankAndLeverTreeMapSelect.chooseValue();
     * 策略:
     * 1.如果有用户级别值,比如90分,那100、80、70、60、40、20分的客户端中,选择最近的80分的客户端。
     * 2.如果用户没有级别,那80/70/60/40/20中,选择及格的最低的60,如果都不及格,选择最高的40。
     * 3.选择了一个分值的客户端,如果这里面有多个,再随机选择一个(未来根据完成情况或者性能)
     * <P></P>
     * @return
     */
	@Deprecated
    public K choose() {
...
    }
    
    /**
     * 考虑到重复情况,不能返回key了,只能返回特定value后再循环处理。
     * @return
     */
    public Integer chooseValue() {
    	if(treeMap.size()==0) return null;
    	if(treeMap.size()==1) return treeMap.firstEntry().getKey();
        if(_userRank>0d){//如果有用户级别
        	logger.debug("有用户级别值,找接近最大的。_userRank:"+_userRank);
        	SortedMap<Integer, K> headMap = this.treeMap.headMap(_userRank, true);
        	logger.debug("_userRank & headMap.size:"+_userRank+"|"+headMap.size());
        	if(headMap.size()==0) return treeMap.firstEntry().getKey();//如果找不到,给最低的。
        	return headMap.lastKey();
        }
        else{//如果无用户级别
        	logger.debug("无用户级别值,找及格里最小的。_baseLever:"+_baseLever);
        	SortedMap<Integer, K> tailMap = this.treeMap.tailMap(_baseLever, true);
        	logger.debug("_baseLever & headMap.size:"+_userRank+"|"+tailMap.size());
        	if(tailMap.size()==0) return treeMap.lastEntry().getKey();//如果都生活及格,找一个最大的。
        	return tailMap.firstKey();
        }
    }
```


## 3.3 业务处理核心类的功能

### 3.3.1 TaskContainer主要的属性

包括了背调任务存放,失败处理队列,子任务分发线程池,外部持久化接口实现,子任务完成监听类。超时时间,尝试次数配置。

```java
/**
 * 任务容器-管理背调任务与相关子任务
 * @author liujun
 * @date 2018年1月18日 上午10:33:01
 */
public class TaskContainer {
	private static final Logger logger = Logger.getLogger(TaskContainer.class);

	/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
	public volatile static Map<String, TaskInfo> taskInfoMap = new ConcurrentHashMap<String, TaskInfo>();

	/** 任务在内存中允许的最大存放数 */
	private static Integer maxTaskMapSize=Integer.MAX_VALUE;
	/**
	 * 实时子任务信息(subtaskid---SubTaskInfo)
	 * 目的:子任务完成后,根据subTaskId从这里快速拿到对应的子任务。从上面的主任务不方便找。
	 * 不需要了,子任务中带有主任务ID,所以还是先拿主任务,再取子任务处理。
	 */
//	public volatile static Map<String, SubTaskInfo> subTaskInfoMap = new ConcurrentHashMap<String, SubTaskInfo>();
	
	/**使用阻塞队列,放置所有要处理的失败子任务.失败的任务先会再回线程池,之后超时会触发返回*/
	BlockingQueue<SubTaskInfo> subTaskInfoQueue = new LinkedBlockingQueue<SubTaskInfo>();
	
	/**任务超时是否自动处理,此超时不是推送客端尝试多次,而是等待子任务完成*/
	public boolean autoDealTimeoutSubtask = false;
	/**
	 * 子任务推送的最多尝试次数
	 */
	public static int maxRePushSubTaskTimes=5;
	/**
	 * 等待子任务完成的超时的时间
	 */
//	public static long maxTimeoutSubTaskDealTime=24*60*60000L;
	
	/**线上子任务超时时间*/
	public static long maxTimeoutOnlineSubTaskTime=60*1000L;
	
	/**主任务超时时间*/
	public static long maxTimeoutTaskTime=2*24*60*60*1000L;

	/**
	 * 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
	 * 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
	 */
	public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();

	/** app,client配置容器类 */
	private AppContainer appContainer;

	/** 子任务分派用线程池 */
	private static ExecutorService executor = Executors.newCachedThreadPool();

	/** 监听器-子任务完成 */
	public SubTaskListener subTaskListener = new SubTaskListener();

	/** 监听器-客户端上下线事件 */
	public ClientStatusListener clientStatusListener = new ClientStatusListener();

	/** 任务池锁 */
	private Object TaskLock = new Object();

	/** 任务处理线程 */
	public SubTaskRedo subTaskRedo = new SubTaskRedo();
	/**
	 * 外部持久化任务接口
	 */
	public TaskPersistenceInterface taskPersistenceInterface;
```


### 3.3.2 主要方法

#### 接收背调任务及选择客户端后发出去

在initContainer中已经用TaskFactory,把请求参数处理成TaskInfo对象了,处理过程中已经持久化了。
```java
		String queryJsonStr = jsonParam.toString();
		logger.info("【发起任务】背调任务请求参数:" + queryJsonStr);
		if (containerInit != null) {
			logger.info("---------【用户发起背调了...】--------");
			TaskInfo taskInfo = TaskFactory.creatTaskInfo(jsonParam);
			taskContainer.createAndPutTaskPool(taskInfo);
		} else {
			logger.warn("【发起任务】背调中心没有启动");
		}
```


taskContainer处理背调任务:

```java
	/**
	 * <P>
	 * 根据提交请求,生成主任务子任务,放入登记的map,并放入子任务队列
	 * </P>
	 * @param paras
	 * @param appCode
	 * @throws SurveyException 
	 */
	public boolean createAndPutTaskPool (TaskInfo taskInfo) throws SurveyException {
		boolean createResult=false;
//		TaskInfo taskInfo = TaskFactory.creatTaskInfo(paras);
		logger.debug("【主任务Map添加】当前总数:" + taskInfoMap.size());
		if (taskInfoMap.size() >= maxTaskMapSize) {
			logger.error("【任务登记MAP】已满!");
			throw new SurveyException("任务登记已经满");
//			return false;
		} else {
			taskInfoMap.put(taskInfo.getTaskId(), taskInfo);
			Iterator<Map.Entry<String,SubTaskInfo>> iter =taskInfo.getSubTaskMap().entrySet().iterator();
			while (iter.hasNext()) {
				Map.Entry<String, SubTaskInfo> entry = (Map.Entry<String, SubTaskInfo>) iter.next();
				try {
					startExecutorCompletionService(entry.getValue());
				} catch (InterruptedException | ExecutionException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
					throw new SurveyException("任务提交线程池失败",e);
				}
			}
			logger.debug("【主任务池添加】当前总数+1后:" + taskInfoMap.size());
			//改阻塞队列不需要另加锁
//			synchronized (TaskLock) {
//				TaskLock.notifyAll();
//			}
			createResult = true;
		}
		logger.debug("【任务登记MAP】情况如下:");
		Iterator<Map.Entry<String,TaskInfo>> iter =taskInfoMap.entrySet().iterator();
		while (iter.hasNext()) {
			Map.Entry<String, TaskInfo> entry = (Map.Entry<String, TaskInfo>) iter.next();
			logger.debug("【任务登记MAP】主任务id|(总|推|执|完):"+entry.getValue().getTaskId()+"|"+entry.getValue().getTotalTask()+"|"+entry.getValue().getPushTask()+"|"+entry.getValue().getExecuteTask()+"|"+entry.getValue().getCompleteTask() );
		}
		return createResult;
	}
```


上面的拆分后的子任务处理:startExecutorCompletionService(entry.getValue());

```java
	/**
	 * <P>将子任务设置一个可用的客户端后,通过线程池执行发送</P>
	 * 
	 * @param subTaskInfo
	 * @param taskContainer
	 * @throws InterruptedException
	 * @throws ExecutionException
	 */
	public void startExecutorCompletionService(SubTaskInfo subTaskInfo) throws InterruptedException, ExecutionException {

		if (StringUtils.isBlank(subTaskInfo.getAppCode()) && StringUtils.isBlank(subTaskInfo.getAppKey())) {
			logger.warn("【分派预处理】子任务未设置AppCode或者AppKey,子任务Id:" + subTaskInfo.getSubTaskId());
			return;
		}
		if (!StringUtils.isBlank(subTaskInfo.getStatus())) {
			//已经有状态的,都是从库里加载的,不处理了,只放池子里。等线下的回调,或者人工处理,或者超时了。
			logger.warn("【分派预处理】子任务已经有状态,不再分配推 。子任务Id:" + subTaskInfo.getSubTaskId()+",状态:"+subTaskInfo.getStatus());
//			if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
//				subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
			return;
		}
		// 从在线客户端表中取一个
//		SurveyClient surveyClient = AppContainer.getWeightRandomClient(subTaskInfo.getAppCode());
		SurveyClient surveyClient =null;
		if(StringUtils.isBlank(subTaskInfo.getAppKey())){
			logger.debug("【老版本-按AppCode分配】-----------旧版getAppCode--"+subTaskInfo.getAppCode());
			surveyClient=AppContainer.getClientByUserRankAndClinetLever(subTaskInfo.getAppCode(),0);
		}else
		{
			logger.debug("【新版本-按Appkey分配】-----------新版getAppKey--"+subTaskInfo.getAppKey());
			surveyClient=AppContainer.getClientByUserRankAndClinetLeverByAppkey(subTaskInfo.getAppKey(),0);
		}
		if (surveyClient == null || surveyClient.getClientCode() == null) {// 后面表示没有业务心跳补充属性
//			logger.debug("");
			logger.warn("【分派预处理】暂无可用的客户端");
			logger.debug("");
			subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
//			subTaskInfoQueue.put(subTaskInfo);//没客户处理,则回炉
			
			logger.debug("【分派预处理】原来回炉再次尝试,现在直接返回任务推送失败,再推无意义");
			// 自动处理,作为失败子任务返回
			JSONObject finishJason = new JSONObject();
			finishJason.put("code", "failure");
			finishJason.put("msg", "任务推送失败");
			JSONObject finishData = new JSONObject();
			finishData.put("taskId", subTaskInfo.getTaskInfo().getTaskId());
			finishData.put("subTaskId", subTaskInfo.getSubTaskId());
			finishData.put("remark", "暂无可用的客户端");
			finishJason.put("data", finishData);
			logger.debug("【子任务队列消费】:【暂无可用的客户端");
			//通用处理完成或者失败的子任务。推送试过了也就当子任务结束了。
			updateSubTaskStatus(finishJason);
			return;
		} 
		
		//设置执行子任务的在线客户端,之前有设置过其它的,也会被替换成当前的
		String sessionId=surveyClient.getSessionId();
		logger.debug("【分派预处理】策略选出的客户端sessionId为:" +sessionId );
		subTaskInfo.setSurveyClient(surveyClient);
		subTaskInfo.setClientCode(surveyClient.getClientCode());//标识最后一次使用的客户端

		//一个在线客户端下所有的任务中加入此任务。用于客户端下线后,更新下面的子任务
		 List<SubTaskInfo> subTaskInfoList =clientSubTaskInfoMap.get(sessionId);
		 if(subTaskInfoList==null) subTaskInfoList=new ArrayList<SubTaskInfo>();
		 subTaskInfoList.add(subTaskInfo);

		// 交线程池执行分派子任务
		Future<String> future = executor.submit(new AssignTask(subTaskInfo));
		String exeResult = "";
		try {
			exeResult = future.get(15L, TimeUnit.SECONDS);
		} catch (TimeoutException|ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			logger.warn("【分派预处理】出错e:" + e.getMessage());
			exeResult = "failure";
		} finally {
			logger.info("【分派预处理】子任务分派结果为:" + exeResult);
			boolean isSubTaskSendOk = "success".equals(exeResult);
			//推送后的维护
			clientSubTaskInfoMap.get(sessionId).remove(subTaskInfo);//从当前客户端下移除子任务
			subTaskInfo.setSurveyClient(null);//子任务清除客户端
            
			if(!isSubTaskSendOk){
				logger.info("【分派预处理】子任务分派不成功,重新回队列subTaskId:" + subTaskInfo.getSubTaskId());
				subTaskInfo.setExeErrorCount(subTaskInfo.getExeErrorCount()+1);
				subTaskInfoQueue.put(subTaskInfo);//不成功,则回炉
			}else{
				subTaskInfo.setAsignStatus("success");//表示分配成功,等结果了
				if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType()))
				subTaskInfoQueue.put(subTaskInfo);//如果成功的,并且是线上的,进行超时处理。
			}
			
			//对主任务进行状态标识
			TaskInfo taskInfo = subTaskInfo.getTaskInfo();
			synchronized (taskInfo) {
				TaskInfo.modifyTaskInfoPush(taskInfo, isSubTaskSendOk);
			}
		}
	}
```


Future<String> future = executor.submit(new AssignTask(subTaskInfo));中的线程池任务。

```java
	/**
	 * <P>线程池执行发送任务</P>
	 * 
	 * @author liujun
	 * @date 2018年1月15日 下午5:36:48
	 */
	static class AssignTask implements Callable<String> {
		private SubTaskInfo subTaskInfo;

		public AssignTask(SubTaskInfo subTaskInfo) {
			this.subTaskInfo = subTaskInfo;
		}

		@Override
		public String call() throws Exception {
			// Thread.sleep(3000);
			 logger.info("【推送线程任务】任务执行...");
			// if(true) return "success";//测试

//			String body = subTaskInfo.getParaObj().toString();
			//推送的子任务,只包括总任务ID,子任务ID,其它都是一个json中。
			SubTaskData subTaskData=new SubTaskData();
			subTaskData.setTaskId(subTaskInfo.getTaskInfo().getTaskId());
			subTaskData.setSubTaskId(subTaskInfo.getSubTaskId());
			subTaskData.setSubTaskType(subTaskInfo.getSubTaskType());
			subTaskData.setQueryJsonStr(subTaskInfo.getParaObj().toString());
			//这两个用于异步任务时,客户端按里面的IP,PORT发结果消息上来。
			//不管是什么,都加上这个。如果多个中间层,那要按这个回复信息。
			subTaskData.setServerIp(ContainerInit.getInstance().appContainer.getServerIp());
			subTaskData.setServerPort(ContainerInit.getInstance().appContainer.getServerPort());
			
			String body =(JsonUtils.toString(subTaskData));
			
			logger.debug("----------------【推送子任务】--------------------body:"+body);
			// String appCode = subTaskInfo.getAppCode();

			if (subTaskInfo.getSurveyClient() == null) {
				logger.info("【推送线程任务】任务未设置执行客户端:" + subTaskInfo.getDescription());
				return "failure";
			}
			String clientSessionId=subTaskInfo.getSurveyClient().getSessionId();
			logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
			boolean bln = ServerPushHandler.pushBySessionId(subTaskInfo.getSurveyClient().getSessionId(), "assignTaskToClient", body, new ServerPushTaskCallback());
			logger.info("【推送任务任务】bln:"+bln);
			// boolean bln =
			// ServerPushHandler.pushByAppKey(subTaskInfo.getSurveyClient().getClientCode(),
			// "assignTaskToClient", body);// 消息推送,推送所有服务器
			
			//不管成功不成功,去除子任务与动态客户端的关联(成功就不要关联了,不成功也应该换其它的客户端了)
//			List thisClientSubTaskList=clientSubTaskInfoMap.get(clientSessionId);
//			if(thisClientSubTaskList==null) thisClientSubTaskList=new ArrayList<SubTaskInfo>();
//			logger.info("【推送任务推送】当前session下的子任务数:" + thisClientSubTaskList.size());
//			if (thisClientSubTaskList.size() > 0) {
//				thisClientSubTaskList.remove(subTaskInfo);
//				subTaskInfo.setSurveyClient(null);
//			}
//			Map<String, List<SubTaskInfo>> 
			
			//注意:【在返回值的furturn中处理移除或者再入队的操作】
			
			if (bln) {
				logger.info("【推送任务推送】子任务成功");
				subTaskInfo.setAsignStatus("success");
				return "success";
			} else {// 推送失败
				logger.error("【推送任务推送】子任务推送失败:" + subTaskInfo.getSubTaskId());
				subTaskInfo.setAsignStatus("failure");

				//按说客户端下线可以事件中移除,但偶尔出现没有移除,所以在这里将无法推送的,
				//将这个不可用的channel的的客户端移除
				//注意:【在返回值的furturn中处理移除或者再入队的操作】
				Map<String, SurveyClient> surveyClientMap=AppContainer.onlineClientMap.get(subTaskInfo.getSurveyClient().getSurveyApp().getAppKey());
				if(surveyClientMap.containsKey(subTaskInfo.getSurveyClient().getSessionId())){
					logger.info("【推送失败移除客户端】sessionId:"+subTaskInfo.getSurveyClient().getSessionId());
					surveyClientMap.remove(subTaskInfo.getSurveyClient().getSessionId());// 根据通讯客户端,移除里面的调查客户端对象
				}
				return "failure";
			}
		}
	}
```


#### 接收客户端任务完成的handler

将完成情况告诉配置进来的监听器,监听器会发起更新子任务的相关操作。

```java
	@Override
	public MiddleMsg handleMsgEvent(MsgEvent dm, MiddleMsg msg) {
		// TODO Auto-generated method stub
		String body = msg.getBody() + "";
		String code = "";
		logger.debug("【得到Client子任务返回结果】API返回子任务结果:" + body);
		SurveyResponse td = new SurveyResponse();
		try {
			JSONObject tdObject = JsonUtils.toJSONObject(body);
			// 子任务失败原因
			subTaskListener.onSubTaskFinished(tdObject);
			code = "success";
		} catch (Exception e) {
			e.printStackTrace();
			code = "failure";
			logger.error("【得到Client子任务返回结果】背调中心消息处理任务完成消息失败!异常:" + e.toString());
		}
		td.setCode(code);
		msg.setBody(td);
		return msg;

	}
```


监听后发起的更新操作:

```java
	/**
	 * <P>
	 * 根据监听的子任务完成结果,更新任务的状态,都完成后有可能从总任务表中移除,并且调用外部接口,进行持久化操作。
	 * </P>
	 * 
	 * @param finishJason
	 */
	public void updateSubTaskStatus(JSONObject finishJason) {
		// 根据子任务完成情况,修改子任务的状态,以及主任务的状态
		logger.info("【修改子任务的完成状态】:"+finishJason.toString());
		
		try {
			String code = finishJason.getString("code");
			String msg = finishJason.getString("msg");
			String subTaskId = finishJason.getJSONObject("data").getString("subTaskId");
			String taskId = finishJason.getJSONObject("data").getString("taskId");
			String remark = (finishJason.getJSONObject("data").containsKey("remark"))? finishJason.getJSONObject("data").getString("remark"):"";
//			String clientCode = finishJason.getString("clientCode");
			TaskInfo taskInfo=taskInfoMap.get(taskId);
			if(taskInfo==null){
				logger.info("【修改子任务的完成状态】内存中已经没有这个主任务了!taskId:"+taskId);
				//如果需要,找不到主任务了,还可以直接入库
				//如果非线上任务,有可能内存中没有了,因为非线上,内存中存在的时间太长了,占用比较大
				//是否都完成,以及上报都在接口中实现
				if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(finishJason);//子任务入库
				return;
			}
			SubTaskInfo subTaskInfo = taskInfo.getSubTaskMap().get(subTaskId);
			
			
//			if(Constants.TASK_TYPE_ONLINE.equals(subTaskInfo.getSubTaskType())){
//				logger.info("【修改子任务的完成状态】线上子任务不能人工处理!subTaskInfo.getSubTaskType():"+subTaskInfo.getSubTaskType());
//				//如果需要,找不到主任务了,还可以直接入库
//				return;
//			}
			//1.【收到】
			if("received".equals(code)) {
				logger.info("【修改子任务的完成状态,并移除】子任务为异步的,对方已经收到!code:"+code);
				subTaskInfo.setStatus(Constants.TASK_DOING);
				subTaskInfo.setDescription("子应用已经收到子任务");
				synchronized (taskInfo) {
					taskInfo.setReceivedAsyncTask(taskInfo.getReceivedAsyncTask()+1);
				}
				//这里啥也不做。因为是异步的,只是对方已经收到了。
				return;
			}
			if(Constants.TASK_FAILURE.equals(subTaskInfo.getStatus()) ||  Constants.TASK_DONE.equals(subTaskInfo.getStatus())  ){
				//这里啥也不做。可能总任务检测时设置了结果
				return;
			}
			subTaskInfo.setSurveyClient(null);
//			subTaskInfo.setUpdateTime(new Date());//数据持久化时再设置
			//2.【不成功】 默认成功。成功时...(一定是执行且成功的)
			if(!"success".equals(code)) {
				logger.info("【修改子任务的完成状态,并移除】子任务推送或者执行出错了!返回code:"+code);
				subTaskInfo.setStatus(Constants.TASK_FAILURE);
				subTaskInfo.setDescription(msg+remark);//中文失败与原因
				//这里是否加入出错队列再处理?还是先入库,以后从库中加载呢?都可以,目前先入。如果推送失败的,已经推过多次了,如果执行失败的,先不再推送了。
//				return;
			}//3.【成功】
			else{
				logger.info("【修改子任务的完成状态,并移除】子任务执行成功!code:"+code);
				String hasData = (finishJason.getJSONObject("data").containsKey("hasData"))? finishJason.getJSONObject("data").getString("hasData"):"";
				Integer dataCount =0;
				try {
					dataCount = (finishJason.getJSONObject("data").containsKey("dataCount"))? finishJason.getJSONObject("data").getInt("dataCount"):0;
				} catch (Exception e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				subTaskInfo.setHasData(hasData);
				subTaskInfo.setDataCount(dataCount);
				subTaskInfo.setStatus(Constants.TASK_DONE);
			}

			
			if(taskPersistenceInterface!=null) taskPersistenceInterface.modifySubTask2DB(subTaskInfo);//子任务入库
			synchronized (taskInfo) {//计算完成数
				//执行数据+1(包括推失败的,对方收到的异步的),完成数要看成功才成。
				TaskInfo.modifyTaskInfoFinish(taskInfo, "success".equals(code));
//				logger.info("【修改子任务的完成状态,并移除】总任务【移除前】有:"+taskInfoMap.size());
//				logger.info("【修改主任务的状态,并可能移除】当前主任务的-总|执|完|线:" + taskInfo.getTotalTask() + "|" + taskInfo.getExecuteTask()+ "|" + taskInfo.getCompleteTask()+ "|" + taskInfo.getOnlineTask());
			}
			
			//del--->当执行数与线上数一样时。线上都完成了。就持久化,但不移除。当与总数一样时,持久化并移除。
			//都执行了就移除,并持久化。但如果不全是线上的,置的状态不一样
			if (taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue()) {
				//【移除的情况:】当线上数与总数一样的时候,全完成了。就从内存中移除,并且调外部接口类进行持久化。线下子任务没有超时机制,可能一直接没反馈,由主任务总超时处理。
//				if (taskInfo.getOnlineTask().intValue() == taskInfo.getTotalTask().intValue()  || taskInfo.getExecuteTask().intValue() == taskInfo.getTotalTask().intValue() ) {
				//如果没有异步的任务
				if (taskInfo.getReceivedAsyncTask()==0 ) {
					//主任务状态为:
					boolean isAllSubtaskOk=taskInfo.getCompleteTask().intValue()==taskInfo.getExecuteTask();
					taskInfo.setStatus(isAllSubtaskOk?Constants.TASK_DONE:Constants.TASK_FAILURE);
//					if(taskInfo.getCompleteTask().intValue() == taskInfo.getExecuteTask().intValue()) taskInfo.setStatus(Constants.TASK_FAILURE);
//					taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除
//					if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
					logger.info("【修子任务的完成状态,全部完成并移除】移除任务id:" + taskInfo.getTaskId());
				}else{
					//【持久化】都移除,线下不适合在内存中长时间放。
					taskInfo.setStatus(Constants.TASK_DOING);
					logger.info("【修子任务的完成状态,只持久化线上部分】任务id:" + taskInfo.getTaskId()+"。此时收到线下任务回复数为:"+taskInfo.getReceivedAsyncTask());
				}
				taskInfoMap.remove(taskInfo.getTaskId());// 从任务记录中移除(线下的不适合长时间放在内存中)
				logger.info("【修改任务的完成状态,全部完成并移除】有完成后移除。总任务【移除后】有:" + taskInfoMap.size());
				if(taskPersistenceInterface!=null) taskPersistenceInterface.modifyTask2DB(taskInfo);//主任务入库
			}

		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			logger.info("【修改子任务的状态,并移除】出错:"+e.toString());
		}
	}
```


#### 其它方法介绍

监听客户端下线时(客户端管理里也有去监听),将其它持有的子任务中标识的客户端置空,并移除重新选择客户端。

```
public synchronized void updateSubTaskInfoByOffline(String clientSessionId)
```


定时任务:清理、失败任务没超时,没超重试次数时的再次执行。

```java
    /**
     * <P>守护线程中定时会清理超时的主任务。(线上子任务等待结果超时在队列里处理,线下超时不处理,由主任务超时处理)</P>
     */
    public void dealTimeoutTask(TaskInfo taskInfo)

    /**
	 * <P>用于处理阻塞对列里的子任务,再扔进线程池。</P>
	 * 1.反复处理再分配的子任务,一定次数后超时。主要是原通讯没返回sessionId,监听移除有问题,所以多试几次。
	 * 每次试如果不能会真正移除不在线的客户端。
	 * 2.处理线上子任务,已经推送出去了,状态发生变化。反复检测是不是超时没有收到反馈。有反馈的会设置子任务状态,就不再入队了。
	 * @author liujun
	 * @date 2018年1月22日 上午9:59:10
	 */
    private class SubTaskRedo implements Runnable
```

此外,还有从持久层加载可以再执行的任务,提供人工控制的接口方法等。

# 4. 中间层的设计

## 4.1 作为客户端的核心处理类AsClientContainer

这个对接前面提到的SS。

```java
public class AsClientContainer {

	private static final Logger logger = Logger.getLogger(AsClientContainer.class);

	public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
	
	private static JSONArray REMORTSERVERS=new JSONArray();
	
	public static Map<String,JSONObject> REMORTSERVERS_MAP =new HashMap<String,JSONObject>();

	/*子任务存放*/
//	public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String, JSONObject>();
	/**使用阻塞队列,放置所有要处理的子任务*/
	public volatile static BlockingQueue<JSONObject> subTaskInfoQueue = new LinkedBlockingQueue<JSONObject>();
	/**
	 * 需要人工干预的子任务池
	 */
	public volatile static Map<String, JSONObject> subTaskInfoMap = new ConcurrentHashMap<String,JSONObject>();
	/*同步对象存放*/
	public volatile static Map<String, PushFuture<CommonReturnData>> syncKey = new ConcurrentHashMap<String, PushFuture<CommonReturnData>>();
	
	/**用户单例线程锁*/
	private static Boolean lockSigleton = true;
	/**用户单例对象*/
	private static AsClientContainer clientContainer;

	public TaskConsumer taskConsumer = new TaskConsumer();
	
	public TimeoutSubTask timeoutSubTask = new TimeoutSubTask();
	
	public OfflineSendInterface offlineSendInterface;
	
	public DateFormatInterface dateFormatInterface;
	
	/** 失败的任务用线程池 */
	private static ExecutorService executor = Executors.newCachedThreadPool();
	
	private ScheduledExecutorService executorTimeout = Executors.newScheduledThreadPool(1);
	
	public static AtomicLong                 subTaskNum                = new AtomicLong();

```


任务消费,对重试任务,线上任务,人工任务都分别进行处理,人工任务的处理由外部提供处理类,真正实现由dubbo完成。

```java
	private class TaskConsumer implements Runnable {
		@Override
		public void run() {
			while (true) {
				try {
					//因为这个队列中都是出问题的子任务,所以要等待一下处理。
					Thread.sleep(1500);// 调节频率,过快容易撑死~~
//					logger.debug("【子任务队列】的任务数1:" + taskNum);
					JSONObject subtaskInfo=subTaskInfoQueue.take();
					logger.debug("【子任务队列消费】取出子任务JASON:"+subtaskInfo.toString());
					
					String subTaskType=subtaskInfo.containsKey("subTaskType")?subtaskInfo.getString("subTaskType"):null;
					//!!!!!!检查这个子任务是还否可以复制之前的结果,如果可以就复制出来,返回一个成功的结果。
					//这里交冯实现的接口,由于工作变动,还没出来。
					
//					logger.debug("【子任务队列】的任务数2:" + subTaskInfoQueue.size());
					//如果重试了50次或者超时了5分钟,那么子任务失败吧
					long reDoTime=new Date().getTime()-subtaskInfo.getLong("startDate");
					logger.debug("reDoTime:"+reDoTime+"。testnum:"+subtaskInfo.getInt("testNum"));
					if(subtaskInfo.getInt("testNum")>Integer.parseInt(MiddleConfig.getFailureSubTaskMaxRetryTimes()) || (reDoTime>Long.parseLong(MiddleConfig.getFailureSubTaskMaxRetryTimes())) ){
						logger.debug("【子任务队列消费】子任务超时失败:"+subtaskInfo.getString("subTaskId"));
						logger.debug("【子任务队列消费】子任务超时失败,尝试次数为:"+subtaskInfo.getInt("testNum"));
						
						AsClientContainer.subTaskInfoMap.remove(subtaskInfo.getString("subTaskId"));
						
						//通用处理完成或者失败的子任务
						Thread.sleep(1000);// 调节频率,过快容易撑死~~
						String isAsync=subtaskInfo.containsKey("isAsync")? subtaskInfo.getString("isAsync"):null;
						//1.【推失败了,如果是异步的,就发消息给服务端】
						if("async".equals(isAsync) || !Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
//							AsClientContainer.sendTaskAsyncResult2Server(asyncServerCode,finishJason);
							//如果异步调用失败。
							logger.debug("【推送异步任务】超时了,发消息给服务器");
							JSONObject finishJason=new JSONObject();
							finishJason.put("code", "failure");
							finishJason.put("msg", "OFFLINE_RPC_FAIL中间层任务调用C端失败");
							finishJason.put("data", subtaskInfo);//这里面有ip/port用于异步。
							
							
							//如果是线下的推送或者调用失败了,只持久化到本地,再重试。或者超时。不可以迅速返回失败的。
							sendTaskAsyncResult2Server(finishJason);
							return;

						}
						//2.【如果是线上任务推送失败,设置同步等待对象。】
						JSONObject finishJason=new JSONObject();
						finishJason.put("code", "failure");
						finishJason.put("msg", "中间层任务推送失败");
						JSONObject finishData=new JSONObject();
						finishData.put("taskId", subtaskInfo.getString("taskId"));
						finishData.put("subTaskId", subtaskInfo.getString("subTaskId"));
						finishData.put("remark", "重试了"+subtaskInfo.getInt("testNum")+"次,用时"+reDoTime+"ms");
						finishJason.put("data", finishData);
						PushFuture<CommonReturnData> responseFuture = AsClientContainer.syncKey.get(subtaskInfo.getString("subTaskId"));
						if(responseFuture!=null){
							CommonReturnData response=new CommonReturnData();
							response.setCode("failure");
							response.setMsg("任务超时失败");
							response.setData(finishData);
							responseFuture.setResponse(response);
							logger.debug("【推送任务任务】超时了,设置同步对象的返回值");
						}
						else{
							logger.debug("【推送任务任务】设置超时时,同步对象已经被移除。");
						}
					}
					else//如果是正常处理子任务
					{
						//如果非线上任务,就走外部接口(注入的实现类)发出去(实现类会持久化,再发的)
						logger.debug("subTaskType:"+subTaskType+"。offlineSendInterface:"+offlineSendInterface);
						if(!Constants.TASK_TYPE_ONLINE.equals(subTaskType)){
							try {
								if(offlineSendInterface!=null){
									offlineSendInterface.sendOfflineQuery(subtaskInfo);
								}else{
									logger.warn("【找不到外部(非线上子任务)调用的接口】");
									throw new SurveyException("找不到外部(非线上子任务)调用的接口实现类");
								}
								logger.info("【推送任务任务】成功推送非线上任务到C端!");
							} catch (SurveyException e) {
								// TODO Auto-generated catch block
								e.printStackTrace();
								logger.warn("【推送任务任务】推送非线上任务到C端失败!");
								ReDoTask reDoTask = new ReDoTask(subtaskInfo);
								executor.submit(reDoTask);
							}
						} 
						else //如果是线上的,就推送出去。
						{
							SurveyClient surveyClient = AsServerContainer.getClientByUserRankAndClinetLever(0);
							// 如果找到策略的客户端
							if (surveyClient != null && surveyClient.getSessionId() != null) {
								// 开始推送
								String body = JsonUtils.toString(subtaskInfo);
								String clientSessionId = surveyClient.getSessionId();
								logger.info("【推送任务任务】推送目标sissionId:" + clientSessionId);
								boolean bln = ServerPushHandler.pushBySessionId(clientSessionId, "assignTaskToClient", body, new MiddlePushClientTaskCallback());
								logger.info("【推送任务任务】bln:" + bln);
								if (bln) {
									logger.info("【推送任务任务】成功!");
									// 推成功了,但一直不返回,也是个问题。不过同步对象会被移除的。

								} else {
									logger.warn("【推送任务任务】推送失败!");
									ReDoTask reDoTask = new ReDoTask(subtaskInfo);
									executor.submit(reDoTask);
								}
							} else {// 没有可用的客户端
								logger.info("【推送任务任务】bln:没有可用的客户端,直接失败返回。次数:" + subtaskInfo.getInt("testNum"));
								ReDoTask reDoTask = new ReDoTask(subtaskInfo);
								executor.submit(reDoTask);
							}
						}
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		}
	}
```


## 4.2 作为服务的核心处理类AsServerContainer

```java
public class AsServerContainer {
	private static final Logger logger = Logger.getLogger(AsServerContainer.class);

	/** 实时总任务信息(taskid---TaskInfo(SubTaskInfoMap)) */
	public static volatile Map<String, SurveyClient> onlineClientMap=new ConcurrentHashMap<String,SurveyClient>();

	public boolean isMiddlewearStarted=false;
	
	/** 任务在内存中允许的最大存放数 */
//	private static Integer maxTaskMapSize=Integer.MAX_VALUE;

	/**
	 * 可接入的类型列表,目前一个中间层只支持一种类型的接口接入。(同一类型的app都一样,不同的不一样)
	 */	
	private static List<ClientApp> clientAppList=new ArrayList<ClientApp>();
	
//	static{
//		//设置有效的客户端
//
//	}
	
	/**
	 * 失败子任务重复处理次数、与超时处理的时间
	 */
	public static int maxReDealSubTaskTimes=20;
	/**
	 * 失败子任务重复处理次数、与超时处理的时间
	 */
	public static long maxTimeoutSubTaskTime=60000L;
	
	/**主任务超时时间*/
	public static long maxTimeoutTaskTime=150000L;

	/**
	 * 一个配置的client下的实时子任务信息(sessionId-List<SubTaskInfo>)
	 * 用sessionId方便应对底层的上下线变化。中间件的事件只能得到sessionId,没有ClientCode。
	 */
//	public volatile static Map<String, List<SubTaskInfo>> clientSubTaskInfoMap = new ConcurrentHashMap<String, List<SubTaskInfo>>();

	/**用户单例线程锁*/
	private static Boolean lockSigleton = true; 
	private static AsServerContainer asServerContainer;
```




# 5. 客户端的设计

## 5.1 核心类的设计

属性如下,主要功能有子任务执行与客户端业务心跳。

```java
/**
 * 背调中间层容器-管理通讯层并处理背调任务
 * @author liujun
 */
public class ClientContainer {
	private static final Logger logger = Logger.getLogger(ClientContainer.class);

	public static final long THEARTBEAT_INTERVAL = 3 * 1000L;
	/**appKey-同类的客户端相同*/
	private String appkey;
	/**appSec-同类的客户端相同*/
	private String appSecret;
	/**客户端标识Code*/
	private String clientCode;
	/**所连接服务器IP*/
	private String serverIP;
	/**此客户端的权重*/
	private String weight;
	/**通讯层客户端*/
	Client client = null;
	/**子任务处理接口对象*/
	ApiInvorkerInterface apiInvorkerInterface;
	/**监听器-监听通讯层客户端状态*/
	ClientStatusListener clientStatusListener;
	/**客户端上报状态传输对象*/
	ClientRealData clientRealData;
	/**客户端是否连接状态标识*/
	private boolean isConnected = false;
	private Object lock=new Object();
//	ScheduledExecutorService service = Executors.newScheduledThreadPool(1);//不需要线程池,只要一个循环的守护线程
	/**用户单例线程锁*/
	private static Boolean lockSigleton = true;
	/**用户单例对象*/
	private static ClientContainer clientContainer;
```


## 5.2 背调任务执行

每一个包装的Web客户端,要实现这个接口来做具体任务。

```java
/**
 * 调用第三方功能的接口,请各个第三方接口应用实现些接口
 * <P></P>
 * @author liujun
 * @date 2018年1月12日 下午5:10:02
 */
public interface ApiInvorkerInterface {
	/**
	 * 根据请求参数与所选择的一组app产生任务并处理
	 * <P></P>
	 * @param paras
	 * @param appCode
	 */
	public CommonReturnData dealSubTaskByApi(JSONObject taskData) throws SurveyException;
}
```


# 6.  其它

## 6.1 与Web应用的整合

外部Web应用提供数据持久化与具体任务执行等接口的实现。实现都是spring容器中的类,由一个统一管理的@Componet类@Autowired这些实现,在其afterPropetiesSet方法时启动核心业务组件,并按提供的接口注入实现类。当然也可以考虑都交给spring管理。

## 6.2 子任务相关Web工程

近20个客户端我做了一个例子工程,后来交办出去发现在clone工程,于是我要求只用一个工程,各种实现类通过配置的不同进行加载。

## 6.3 反思


  • 除了前面提到的通讯层更加独立外,细节问题不少,比如线程池使用不够规范。appClient与缓存map关系,很多用享元模式的。
  • 有些参数不完全确定就用jason。一些参数要根据测试从外部配置进来。
  • 内部系统的启动停止最好由smartCycle控制,这里用afterPropetiesSet只管启动,没有优雅的停止。
  • 有些优化还需要相关应用与人员配合才能整体优化,比如通讯层,配置服务。
  • 如果要高可用,还有非常多的工作要做。
  • 后来发现我们的开发有微服务的味道,有服务注册,有策略,有远程消息调用,但由于没时间研究相关资料,很多没考虑的,比如注册没独立,有单点问题,链路跟踪也没有,不过有任务监控,另只能手工扩容。
  • 公司还有另一个产品,实际上有通用的部分,比如用户,产品,订单等,值得统筹考虑,合理划分微服务。


虽然很多问题,但运行稳定,可以顺利的工作。由于手上还有其它任务,比如企业服务,调查发起,定单处理,被查人同意,根据结果费用核算等应用与功能处理。所以除了功能变更,没真正进行重构过。
0
0
分享到:
评论

相关推荐

    风的回忆留言板

    这个程序作为一个基础的交互式应用,它涵盖了Web开发中的多个核心概念和技术,对于想要踏入C# Web开发领域的新人来说,是一个不错的起点。 C#是一种面向对象的、现代的编程语言,由微软公司开发,主要用于构建...

    基于springboot的班级回忆录源码数据库.zip

    这个项目的核心目标是构建一个能够记录并展示班级回忆的平台,包括照片、活动记录、个人故事等,便于同学们回顾和分享共同的校园时光。 首先,SpringBoot作为项目的基石,简化了Spring应用的初始搭建和配置过程。它...

    [聊天留言]风的回忆留言本源码_fengdehuiyi(ASP.NET源码).rar

    【风的回忆留言本源码】是一款基于ASP.NET技术开发的在线留言系统,适用于个人网站、小型社区或者企业网站,用于收集用户反馈和互动交流。ASP.NET是微软公司推出的一种Web应用程序框架,它构建在.NET Framework之上...

    山东大学2015年软件工程试题(回忆)

    体系结构设计关注于高层次的决策,例如系统如何划分成组件、组件之间的交互方式等。 #### 二、判断题 1. **结构化程序设计方法能改善程序结构,提高程序的运行效率** - 正确。结构化程序设计通过使用模块化、自顶...

    2012软件测试试题(回忆版)1

    它的核心组件包括测试类、测试方法、断言和注解。测试类包含一个或多个测试方法,每个方法测试特定的功能。断言用于检查预期结果是否与实际结果相符,注解如@Test标记测试方法,@Before和@After分别定义在每个测试...

    8-1计算机软件基础(8系)2021秋试卷(回忆版)1

    在2021秋试卷(回忆版)中,涉及的知识点主要包括矢量图、递归法、数据完整性以及软件开发过程。以下是对这些知识点的详细解释: 1. **矢量图**:矢量图是一种基于数学对象(如点、线、曲线)的图像类型,它通过...

    基于ssm+vue同学录网站.zip

    这个项目的核心目标是为用户提供一个平台,方便他们记录和分享与同学们的珍贵回忆,包括照片、事件、联系信息等。下面将详细介绍这个项目涉及的技术和实现细节。 首先,SSM框架是Java Web开发中常见的三层架构解决...

    Spring+SpringMVC+Mybatisjar(完整)

    Spring框架是整个SSM的核心,它是一个全面的后端开发框架,提供依赖注入(DI)、面向切面编程(AOP)、事务管理等核心功能。Spring通过容器来管理对象的生命周期和对象间的依赖关系,使得代码更加解耦和易于测试。...

    校友录系统(ASP.NET 3.5实现).

    其次,Web Forms是ASP.NET的一个核心组件,它提供了丰富的控件库和事件驱动编程模型。在这个校友录系统中,开发者可能会使用诸如GridView、DetailsView等控件来显示和编辑校友信息,使用Button、DropDownList等控件...

    ASP.NET-[交友会员]Asp.net校友录(同学录)系统源码.151.zip

    在“ASP.NET-[交友会员]Asp.net校友录(同学录)系统源码”中,核心是建立一个会员交互的平台。这样的系统通常包括用户注册、登录、个人信息管理、好友添加、消息发送等功能。用户可以通过系统找到老同学,建立和维护...

    校友录系统(ASP.NET 3.5实现)

    这个系统的核心目标是为校友提供一个交流、分享信息和回忆的场所,同时帮助学校维护校友数据库,促进校友间的互动。 ASP.NET 3.5是.NET Framework的一个重要版本,它在2.0和3.0的基础上进行了诸多改进和扩展。该...

    一个不错的.NET同学录

    ASP.NET提供了丰富的功能和组件,简化了Web开发流程,提高了开发效率。 2. 同学录系统架构 该.NET同学录项目可能采用了三层架构设计,包括表示层(UI)、业务逻辑层(BLL)和数据访问层(DAL)。这种架构模式使得...

    回忆应用

    JavaScript作为前端开发的核心语言,它允许开发者在浏览器环境中创建交互式的用户界面,实现动态内容更新,以及处理用户输入的数据。 在JavaScript的世界里,构建这样的应用可能涉及到以下几个重要的知识点: 1. *...

    SpringMyBatis02.zip

    MyBatis 的核心组件包括 XML 配置文件、映射器接口(Mapper Interface)以及映射器 XML 文件。 3. **Spring-MyBatis 整合**:整合 Spring 和 MyBatis 主要涉及以下几个步骤: - **添加依赖**:在项目的 Maven 或 ...

    2019软考高级系统架构案例题和论问题回忆版(无解析).txt

    通过以上分析可以看出,2019年软考高级系统架构案例题和论问题主要考察了系统架构模式、UML图、分布式系统以及数据库技术等方面的知识点,考生需要掌握这些核心概念和技术的应用才能顺利通过考试。

    Struts2详细分类流程API文档

    2. **过滤器Dispatcher**:Struts2的核心组件FilterDispatcher拦截请求,根据配置文件(struts.xml)确定Action的映射。 3. **Action Mapping**:基于请求的URL和方法,框架找到对应的Action类及其执行方法。 4. **...

    SSH最新版完整API文档(Struts2.2.3.1+Hibernate4.0+Spring3.0.5)

    总的来说,这个SSH最新版完整API文档集合是Java开发者的宝贵资源,它涵盖了Web应用开发的关键组件,帮助开发者快速理解和使用这三个框架,提高开发效率。无论是初学者还是有经验的开发者,都能从中受益,深入了解SSH...

    ASP.NET 同学录

    ASP.NET是微软公司推出的Web应用程序框架,它构建在.NET Framework之上,提供了丰富的功能和组件,支持快速开发动态网站、Web服务和应用程序。ASP.NET的核心优势包括强大的服务器控件、自动状态管理、内置的安全性...

    struts2整合包

    Struts2的核心特性包括拦截器、模板引擎和强大的动作调度,这些使得开发者能够构建结构清晰、可维护性高的Web应用。 Hibernate则是一个对象关系映射(ORM)框架,它简化了数据库操作,将Java对象与数据库表之间的...

    acquanym:由ei8htideas为UQDNUI夏季创新项目开发的熟人记住应用程序

    通过使用此应用,用户可以记录与熟人的交互细节,例如姓名、面孔、共同朋友、共享经历等,从而在未来的社交场合中更容易识别和回忆起这些人。 这款应用的核心技术是基于Java编程语言构建的。Java是一种广泛使用的...

Global site tag (gtag.js) - Google Analytics