`
woodding2008
  • 浏览: 291109 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

广告调度中心

 
阅读更多

背景介绍

      一支广告的投放通常有很多的限定条件:地域、时段、频次、广告位、轮播顺序、用户标签【性别、年龄、爱好】。广告主的期望有良好的投放效果,广告平台期望有良好的投放量以及效果。假设媒体资源足够,那我们应该需要计算出每一分钟应该完成的投放量,调度就是为了完成这个目标。

 

调度需要的数据

  • 容量曲线
  • 分钟级别的排期
  • 总的投放量
  • 已经完成的数据量

 

计算公式

  • 本分钟应该完成的量=(总的投放量-已完成量)*容量因子
  • 容量因子=当前分钟容量/剩余排期容量 [计算过程数据完全依赖容量曲线]

 

 

已经完成的量

      获取这个数据的方式有很多种方式,我们这里通过广告投放机器提供的接口获得,调度中心只按照自己的策略获取即可,由于投放机器有很多我们需要并发获取数据来加快整个调度的过程。通过CountDownLatch来控制是否完成了所有机器都的收账。

 

收账代码

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import net.sf.json.JSONArray;
import net.sf.json.JSONException;
import net.sf.json.JSONObject;

import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;

import com.javagc.entity.ErrorStackMessage;
import com.javagc.entity.MergePolicy;
import com.javagc.context.CollectEffectDataContext;
import com.javagc.context.ServerGroupContext.GROUPS;
import com.javagc.entity.CollectEffectData;
import com.javagc.entity.CollectRunStatus;
import com.javagc.entity.CollectTaskDTO;
import com.javagc.entity.CurrCastData.DELIVER_TYPE;
import com.javagc.service.IEffectCollectService;
import com.javagc.util.DispatcherConstants;
import com.javagc.util.DispatcherUtil;

/**
 * 任务 stevenDing<br>
 */
public class CollectTask implements Runnable {

	private static final Logger logger = DispatcherUtil.getLog();
	
	private CollectEffectData 			  collectEffectData;
	private Map<String, CollectRunStatus> runStatusMap;
	private CountDownLatch 		 latch;
	private IEffectCollectService collect;
	private CollectTaskDTO 		 taskDTO;
	private GROUPS  			 group;

	// shared data
	private int 		generateJsonCost;
	private String 		result;
	private boolean 	runStatus;
	private long 		endTime;
	private List<Long> 	reTryRunTimeList = new ArrayList<Long>();
	
	/**
	 * 
	 * @param taskDTO
	 * @param effectData
	 * @param runLogMap
	 * @param latch
	 * @param collect
	 */
	public EffectCollectTask(	CollectTaskDTO 		taskDTO, 
								CollectEffectData 	collectEffectData,
								Map<String, CollectRunStatus> runStatusMap, 
								CountDownLatch 			latch,
								IEffectCollectService 	collect,
								GROUPS  group) {
		
		this.taskDTO 			= taskDTO;
		this.collectEffectData 	= collectEffectData;
		this.runStatusMap 		= runStatusMap;
		this.latch 				= latch;
		this.collect 			= collect;
		this.group				= group;
	}

	public void run() {

		try {
			// retry it
			while (true) {
				// invoke effect data
				long retryBegin = System.currentTimeMillis();
				try {
					invokeEffectData();
					runStatus = true;
				} catch (Throwable e) {
					runStatus = false;
					logger.error(e.getMessage(), e);
					ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true));
					DispatcherConstants.collectErrorStackCollector.produce(msg);
				} finally {
					reTryRunTimeList.add(System.currentTimeMillis() - retryBegin);
				}
				
				//任务剩余时间
				long remainingTime = (taskDTO.getBeginTime() + taskDTO.getTimeout()) - System.currentTimeMillis();

				//成功或剩余时间不足则退出收账
				if (isRunStatus()|| 
					remainingTime < DispatcherConstants.COLLECTOR_BREAKTIME) {
					break;
				}else{
					try{
						Thread.sleep(DispatcherConstants.COLLECTOR_RETRYSLEEPMS);
					}catch(Exception e){}
				}
			}
		} finally {
			try {
				if (taskDTO.getClient() != null) {
					taskDTO.getClient().getConnectionManager().shutdown();
				}
			} catch (Throwable e) {
				logger.error(e.getMessage(), e);
				ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true));
				DispatcherConstants.collectErrorStackCollector.produce(msg);
			}
		}

		try {
			// invoke succ
			if (isRunStatus()) {
				parseEffectData();
				CollectEffectDataContext.I.updateCollectEffectData(group,collectEffectData);
				runStatus = true;
			}
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
			ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true));
			DispatcherConstants.collectErrorStackCollector.produce(msg);
			runStatus = false;
		} finally {
			try {
				endTime = System.currentTimeMillis();
				CollectRunStatus runStatus = wrapperRunStatus();
				// check main thread status
				if (!collect.isCollectCompleted()) {
					runStatusMap.put(taskDTO.getIp(), runStatus);
				}
			} catch (Throwable e) {
				logger.error(e.getMessage(), e);
				ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true));
				DispatcherConstants.collectErrorStackCollector.produce(msg);
			}
			latch.countDown();
		}
	}

	protected void invokeEffectData() throws ClientProtocolException, IOException {
		HttpResponse response = taskDTO.getClient().execute(taskDTO.getGet());
		HttpEntity entity = response.getEntity();
		if (entity != null) {
			result = EntityUtils.toString(entity, "UTF-8");
		}
	}

	/**
	 * json --> {"cost" : 34,"s" : [{"id" : 111, "t" : 58},{"id" : 222,"t" : 66}],
	 * "c" : [{"id" : 10, "t" : 23},{"id" : 15,"t" : 39}],"vs" : [{"id" : 20, "t" : 13},{"id" : 45,"t" : 22}],
	 *   "ps" : [{"id" : 30, "t" : 42},{"id" : 37,"t" : 18}]} 
	 */
	protected void parseEffectData() throws Exception {
		
		JSONObject object = JSONObject.fromObject(getResult());
		// cost time
		generateJsonCost = object.getInt("cost");		
		parseEffectData(DELIVER_TYPE.CPM, object, "s");
		parseEffectData(DELIVER_TYPE.CPC, object, "c");
		parseEffectData(DELIVER_TYPE.CPV, object, "vs");
		parseEffectData(DELIVER_TYPE.CPP, object, "ps");
	}
	
	/**
	 * 
	 * @param type
	 * @param jsonArray
	 */
	protected  void parseEffectData(DELIVER_TYPE type,JSONObject object,String nodeName){
		
		Map<Long, Long> effectMap = new HashMap<Long, Long>();
		try{
			//曝光数据
			JSONArray jsonArray = object.getJSONArray(nodeName);
			//效果数据
			for (int i = 0; i < jsonArray.size(); i++) {
				JSONObject data = (JSONObject) jsonArray.get(i);
				try{
					if(isValidValues(data.getString("t"))){
						long showTimes = data.getLong("t");
						if (showTimes >= 0) {
							effectMap.put(data.getLong("id"), showTimes);	
						}
					}
				}catch(JSONException e){
					StringBuffer buffer = new StringBuffer();
					buffer.append(type+" Effect Data, JSONException, jsonObject=");
					buffer.append(data.toString()).append(" , ip=").append(taskDTO.getIp());
					logger.error(buffer.toString(),e);
					ErrorStackMessage msg = new ErrorStackMessage(e, buffer.toString(), new MergePolicy(true,true));
					DispatcherConstants.collectErrorStackCollector.produce(msg);
				}
			}
	
			if (!collect.isCollectCompleted()) {
				// put result to map
				collectEffectData.putEffectData(taskDTO.getIp(), effectMap,type);
			}
			
		}catch(Exception e){
			if(nodeName.equals("s") || nodeName.equals("c")){
				StringBuffer buffer = new StringBuffer();
				buffer.append(type+" Effect Data, JSONException, ps=null");
				logger.error(buffer.toString(),e);
				ErrorStackMessage msg = new ErrorStackMessage(e, buffer.toString(), new MergePolicy(true,true));
				DispatcherConstants.collectErrorStackCollector.produce(msg);
			}
		}
	}
	
	/**
	 * 包装运行状态
	 * @return
	 */
	protected CollectRunStatus wrapperRunStatus() {
		CollectRunStatus runStatus = new CollectRunStatus();
		runStatus.setIp(taskDTO.getIp());
		runStatus.setGenerateJsonCost(getGenerateJsonCost());
		runStatus.setThreadRunTime(getEndTime() - taskDTO.getBeginTime());
		runStatus.setRunStatus(isRunStatus());
		runStatus.setReTryRunTimeList(getReTryRunTimeList());
		return runStatus;
	}
	
	/**
	 * 检查数据是否有效
	 * @param content
	 * @return
	 */
	private boolean isValidValues(String content){
		
		if(content==null || content.length()==0 || content.getBytes().length>12){
			return false;
		}
		return true;
	}
	
	public List<Long> getReTryRunTimeList() {
		return reTryRunTimeList;
	}

	public long getEndTime() {
		return endTime;
	}

	public boolean isRunStatus() {
		return runStatus;
	}

	public String getResult() {
		return result;
	}

	public int getGenerateJsonCost() {
		return generateJsonCost;
	}
}

 

主进程控制

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.http.Header;
import org.apache.http.HeaderElement;
import org.apache.http.HttpEntity;
import org.apache.http.HttpException;
import org.apache.http.HttpRequest;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.client.HttpClient;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.params.CoreConnectionPNames;
import org.apache.http.protocol.HttpContext;
import org.slf4j.Logger;

import com.javagc.context.ServerGroupContext;
import com.javagc.context.ServerGroupContext.GROUPS;
import com.javagc.entity.CollectRunStatus;
import com.javagc.entity.CollectEffectData;
import com.javagc.entity.CollectTaskDTO;
import com.javagc.task.EffectCollectTask;
import com.javagc.util.DispatcherConstants;
import com.javagc.util.DispatcherUtil;

/**
 * 收账服务<br>
 * 1、一组服务器收账使用最大时长,@see DispatcherConstants.COLLECTOR_TOTALTIMEOUT
 * @author stevending<br>
 *
 */
public class CollectService implements ICollectService {

	private static final Logger logger = DispatcherUtil.getLog();
	
	/**
	 * 分组信息
	 */
	private GROUPS groupNum;
	/**
	 * 收账结果日志
	 */
	private Map<String,CollectRunStatus> runStatusMap = null;
	/**
	 * 结果数据
	 */
	private CollectEffectData 		effectData 	= null;
	/**
	 * 检测是否完成收账
	 */
	private CountDownLatch 	latch 		= null;
	/**
	 * 收账完成状态 true=完成、false=未完成
	 */
	private AtomicBoolean	collectCompleted = new AtomicBoolean(false);
	
	/**
	 * 构造方法
	 * @param groupNum
	 */
	public EffectCollectService(GROUPS groupNum){
		this.groupNum = groupNum;
	}
	
	
	@Override
	public CollectEffectData doCollect() throws InterruptedException{
		
		long beginTime = System.currentTimeMillis();
		try{
			//exec task
			exeTask();
		} finally{
			collectCompleted.set(true);
			//write run status
			writeRunStatus();
		}
		logger.info(groupNum+" Total runTime:"+(System.currentTimeMillis()-beginTime) + " ms");
		return effectData;
	}
	
	/**
	 * 执行任务
	 * @throws InterruptedException
	 */
	private void exeTask() throws InterruptedException{
		
		long beginTime		= System.currentTimeMillis();
		List<String> ips 	= ServerGroupContext.I.getGroupServerIps(groupNum);
		latch		=	new CountDownLatch(ips.size());
		runStatusMap 	= 	new ConcurrentHashMap<String,CollectRunStatus>();
		effectData 	=	new CollectEffectData();
		
		for(int i=0;i<ips.size();i++){
			CollectTaskDTO staskDTO = new CollectTaskDTO( getHttpClient(),
														getHttpGet(ips.get(i),DispatcherConstants.COLLECTOR_PORT),
														ips.get(i),
														DispatcherConstants.COLLECTOR_TOTALTIMEOUT,
														beginTime);
			CollectTask stask = new CollectTask(staskDTO,effectData,runStatusMap,latch,this,groupNum);
			Thread st = new Thread(stask);
			st.setDaemon(true);
			st.start();
		}
		//等待收账完成
		latch.await(DispatcherConstants.COLLECTOR_TOTALTIMEOUT, TimeUnit.MILLISECONDS);
		
		//TODO 清理未完成的线程
	}
	
	/**
	 * 记录运行状态
	 */
	protected void writeRunStatus(){
		
		StringBuffer logBuffer = new StringBuffer("收账过程日志\n");
		List<String> ips = ServerGroupContext.I.getGroupServerIps(groupNum);
		for(int i=0;i<ips.size();i++){
			String ip = ips.get(i);
			if(runStatusMap.get(ip) == null){
				CollectRunStatus consumerStatus = new CollectRunStatus();
				consumerStatus.setIp(ip);
				consumerStatus.setRunStatus(false);
				consumerStatus.setThreadRunTime(DispatcherConstants.COLLECTOR_TOTALTIMEOUT);
				runStatusMap.put(ip, consumerStatus);
			}
			logBuffer.append(runStatusMap.get(ip).toString()).append("\n");
		}
		
		logger.info(logBuffer.toString());
	}
	
	/**
	 * 获取httpClient
	 * @return
	 */
	protected HttpClient getHttpClient(){
		
		DefaultHttpClient httpclient = new DefaultHttpClient();
		
		httpclient.addRequestInterceptor(new HttpRequestInterceptor() {
            public void process(final HttpRequest request,final HttpContext context) throws HttpException, IOException {
                if (!request.containsHeader("Accept-Encoding")) {
                    request.addHeader("Accept-Encoding", "gzip");
                }
            }
        });

        httpclient.addResponseInterceptor(new HttpResponseInterceptor() {
            public void process(final HttpResponse response,final HttpContext context) throws HttpException, IOException {
                HttpEntity entity = response.getEntity();
                if (entity != null) {
                    Header ceheader = entity.getContentEncoding();
                    if (ceheader != null) {
                        HeaderElement[] codecs = ceheader.getElements();
                        for (int i = 0; i < codecs.length; i++) {
                            if (codecs[i].getName().equalsIgnoreCase("gzip")) {
                                response.setEntity(new GzipDecompressingEntity(response.getEntity()));
                                return;
                            }
                        }
                    }
                }
            }
        });
        
        httpclient.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,DispatcherConstants.COLLECTOR_CONNTIMEOUT);
        httpclient.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT,DispatcherConstants.COLLECTOR_READTIMEOUT);
		
		return httpclient;
	}
	
	protected HttpGet getHttpGet(String ip,int port){
		StringBuffer buffer = new StringBuffer();
		buffer.append("http://").append(ip);
		buffer.append(":").append(port);
		buffer.append(DispatcherConstants.COLLECTOR_URL);
		return new HttpGet(buffer.toString());
	}
	
	@Override
	public boolean isCollectCompleted() {
		return collectCompleted.get();
	}

}

 

 

分享到:
评论

相关推荐

    海信智能化公交调度系统.pdf

    1. **实时信息交互**:公交车载信息终端与调度中心通过GPRS实时交换数据,提供乘车参考信息,如到站时间、道路状况等,同时在车内向乘客发布。 2. **公交电子信息站牌**:站牌可显示车辆位置、到站时间、交通情况、...

    公交车智能调度

    雅迅公司的解决方案涵盖了系统设计的各个方面,包括系统的目标、原则、框架结构、业务模块、网络结构以及中心系统的架构。该系统具备多种特色功能,如车载DVR(数字视频录像机)功能,用于记录车辆行驶过程中的视频...

    广告管理系统C#实现.net平台

    数据库是整个系统的数据存储中心,系统采用了关系型数据库管理系统,如SQL Server或MySQL,用于存储广告信息、预约记录、员工资料等大量数据。数据的增删改查操作高效稳定,且支持数据备份和恢复,确保数据安全。...

    基于DGPS_GPRS的智能公交调度系统设计.pdf

    公交车辆的位置信息通过GPRS模块发送回调度中心,调度中心利用GIS系统实时监控车辆位置和速度,以便进行有效调度。 总结: 基于DGPS_GPRS的智能公交调度系统利用先进的定位和通信技术,提高了公交服务的精度和效率...

    电信设备-广告机及广告信息显示方法.zip

    广告机主要用于公共场所,如购物中心、地铁站、机场等,通过高清晰度的显示屏播放动态或静态的广告内容,吸引人们的注意力,实现商业信息的有效传播。其工作原理涉及到图像处理、网络通信、多媒体播放等多个技术领域...

    公交智能调度系统功能解决方案.docx

    行车监控是系统的重要组成部分,通过实时监控公交车辆的行驶状态,可以及时掌握车辆的位置、速度、行驶路线等信息,有助于调度中心对公交线路进行有效管理。用户界面清晰直观,方便操作人员监控和调度。 智能调度...

    基于北斗的出租车调度系统的设计实现及问题研究

    系统通过车载北斗系统获取车辆的精确位置信息,通过车载平台传递至调度分配中心,中心在电子地图上直观展示车辆分布,从而进行安全、合理的调度。 该系统具备以下主要功能: 1. **出租车北斗安全监控系统**:24...

    行业分类-设备装置-多媒体广告的播放方法、装置与系统.zip

    例如,在购物中心,大型LED屏幕播放的绚丽广告能够吸引顾客驻足;在公共交通工具上,车载显示屏播放的广告能够覆盖大量流动人群;在电影院,预映广告则为观众带来丰富的观影体验。 在设备装置方面,多媒体广告的...

    2022年浙科物流管理模拟教学软件实验报告cjh.doc

    在实验过程中,学生分为两人一组,一人扮演两个角色,共同协作,体验总经理、调度中心、仓库中心和运输中心的不同职责,实现理论与实践的结合。 1. **总经理的角色与任务** - **订单受理**:总经理负责接受发货单...

    一种基于ARM的多媒体广告机.pdf

    一种基于ARM的多媒体广告机是一种智能的、现代化的广告展示设备,主要利用先进的微...此外,这种广告机的可扩展性和灵活性使得其能够在各种场景下应用,如购物中心、公共交通工具、办公楼宇等,满足不同环境和需求。

    电信设备-背式移动广告终端.zip

    这种设备通常安装在人流密集的公共场所,如购物中心、地铁站、公交站等,利用其背面的大屏幕展示各种动态或静态广告内容,吸引过往行人的注意力。 在《背式移动广告终端.pdf》这份资料中,可能会详细介绍该设备的...

    GPS车载实时异地监控调度系统

    2. **电话叫车服务**:通过调度中心的电话系统,可以为乘客提供便捷的叫车服务,减少空驶率,提高整体运营效率。 3. **紧急报警功能**:安装在车辆内部的报警开关一旦被触发,会立即向监控中心发送报警信号及车辆...

    行业文档-设计装置-一种电梯多媒体广告机.zip

    电梯多媒体广告机是一种创新的商业展示设备,常用于公共场所如办公楼、购物中心等电梯内部,以播放动态广告、信息公告或品牌宣传等内容。本文件“一种电梯多媒体广告机”提供了关于这种设计装置的详细资料,旨在深入...

    行业文档-设计装置-数字化城市多媒体广告机.zip

    1. 商业推广:购物中心、餐饮店、电影院等地常设有多媒体广告机,以吸引顾客、提升品牌形象。 2. 公共服务:公共交通站点、机场、火车站等公共场所,可用于发布交通信息、公共服务公告等。 3. 城市美化:通过设计与...

    出租车车辆监控方案

    这使得调度中心能随时掌握每辆出租车的状态,确保车辆在合法范围内运行。 2. 紧急报警与自动拍照取证:在发生紧急情况时,系统会自动拍照并发送给管理中心,为处理事件提供关键证据。这极大地提高了警方处理犯罪...

    在GPS智能化调度系统中的公交车辆现场调度流程考虑.pdf

    流程包括司机到场报到、例行保养、车辆调度以及主动监控等步骤,确保所有车辆信息实时传输到总控中心,实现高效管理和调度。 综上所述,GPS智能化调度系统不仅提高了公交运营效率,降低了运营成本,还增强了对突发...

    大数据平台及在推荐广告的应用20.pptx

    任务调度中心负责管理和执行离线和实时任务,同时监控系统如Ganglia和Nagios确保了平台的稳定运行。 推荐广告的架构还涉及到内容质量和评分模型、用户画像体系、商业维度的分析,以及对用户行为的深度理解和语义...

    大数据平台及在推荐广告的应用.pptx

    数据存储中心如Hadoop、Hive、Hbase、Mysql、redis负责数据的持久化,任务调度中心则负责索引构建和内容质量评分。用户画像体系结合基本维度(如性别、年龄)和商业维度(如品类、品牌)等信息,利用网络爬虫引擎、...

    深圳振通公交电子站牌及智能调度管理系统解决方案.pdf

    3) 广告播放调度:中心控制广告播放,提高广告效益。 4) 核心调度管理:灵活调整线路车辆,确保资源合理配置。 二、停车场及枢纽调度管理子系统 1) 自动识别车辆信息:包括车牌、线路、到站时间等,提高管理效率。 ...

    行业文档-设计装置-多媒体广告播放装置.zip

    多媒体广告播放装置广泛应用于购物中心、地铁站、机场、餐厅、医疗机构等各种场所。它们能够精准定位目标受众,提供个性化广告推送,增强品牌影响力。此外,公共信息发布、教育训练、艺术展示等领域也日益依赖这种...

Global site tag (gtag.js) - Google Analytics