背景介绍
一支广告的投放通常有很多的限定条件:地域、时段、频次、广告位、轮播顺序、用户标签【性别、年龄、爱好】。广告主的期望有良好的投放效果,广告平台期望有良好的投放量以及效果。假设媒体资源足够,那我们应该需要计算出每一分钟应该完成的投放量,调度就是为了完成这个目标。
调度需要的数据
- 容量曲线
- 分钟级别的排期
- 总的投放量
- 已经完成的数据量
计算公式
- 本分钟应该完成的量=(总的投放量-已完成量)*容量因子
- 容量因子=当前分钟容量/剩余排期容量 [计算过程数据完全依赖容量曲线]
已经完成的量
获取这个数据的方式有很多种方式,我们这里通过广告投放机器提供的接口获得,调度中心只按照自己的策略获取即可,由于投放机器有很多我们需要并发获取数据来加快整个调度的过程。通过CountDownLatch来控制是否完成了所有机器都的收账。
收账代码
import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import net.sf.json.JSONArray; import net.sf.json.JSONException; import net.sf.json.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.ClientProtocolException; import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import com.javagc.entity.ErrorStackMessage; import com.javagc.entity.MergePolicy; import com.javagc.context.CollectEffectDataContext; import com.javagc.context.ServerGroupContext.GROUPS; import com.javagc.entity.CollectEffectData; import com.javagc.entity.CollectRunStatus; import com.javagc.entity.CollectTaskDTO; import com.javagc.entity.CurrCastData.DELIVER_TYPE; import com.javagc.service.IEffectCollectService; import com.javagc.util.DispatcherConstants; import com.javagc.util.DispatcherUtil; /** * 任务 stevenDing<br> */ public class CollectTask implements Runnable { private static final Logger logger = DispatcherUtil.getLog(); private CollectEffectData collectEffectData; private Map<String, CollectRunStatus> runStatusMap; private CountDownLatch latch; private IEffectCollectService collect; private CollectTaskDTO taskDTO; private GROUPS group; // shared data private int generateJsonCost; private String result; private boolean runStatus; private long endTime; private List<Long> reTryRunTimeList = new ArrayList<Long>(); /** * * @param taskDTO * @param effectData * @param runLogMap * @param latch * @param collect */ public EffectCollectTask( CollectTaskDTO taskDTO, CollectEffectData collectEffectData, Map<String, CollectRunStatus> runStatusMap, CountDownLatch latch, IEffectCollectService collect, GROUPS group) { this.taskDTO = taskDTO; this.collectEffectData = collectEffectData; this.runStatusMap = runStatusMap; this.latch = latch; this.collect = collect; this.group = group; } public void run() { try { // retry it while (true) { // invoke effect data long retryBegin = System.currentTimeMillis(); try { invokeEffectData(); runStatus = true; } catch (Throwable e) { runStatus = false; logger.error(e.getMessage(), e); ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); } finally { reTryRunTimeList.add(System.currentTimeMillis() - retryBegin); } //任务剩余时间 long remainingTime = (taskDTO.getBeginTime() + taskDTO.getTimeout()) - System.currentTimeMillis(); //成功或剩余时间不足则退出收账 if (isRunStatus()|| remainingTime < DispatcherConstants.COLLECTOR_BREAKTIME) { break; }else{ try{ Thread.sleep(DispatcherConstants.COLLECTOR_RETRYSLEEPMS); }catch(Exception e){} } } } finally { try { if (taskDTO.getClient() != null) { taskDTO.getClient().getConnectionManager().shutdown(); } } catch (Throwable e) { logger.error(e.getMessage(), e); ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); } } try { // invoke succ if (isRunStatus()) { parseEffectData(); CollectEffectDataContext.I.updateCollectEffectData(group,collectEffectData); runStatus = true; } } catch (Throwable e) { logger.error(e.getMessage(), e); ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); runStatus = false; } finally { try { endTime = System.currentTimeMillis(); CollectRunStatus runStatus = wrapperRunStatus(); // check main thread status if (!collect.isCollectCompleted()) { runStatusMap.put(taskDTO.getIp(), runStatus); } } catch (Throwable e) { logger.error(e.getMessage(), e); ErrorStackMessage msg = new ErrorStackMessage(e, "", new MergePolicy(false,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); } latch.countDown(); } } protected void invokeEffectData() throws ClientProtocolException, IOException { HttpResponse response = taskDTO.getClient().execute(taskDTO.getGet()); HttpEntity entity = response.getEntity(); if (entity != null) { result = EntityUtils.toString(entity, "UTF-8"); } } /** * json --> {"cost" : 34,"s" : [{"id" : 111, "t" : 58},{"id" : 222,"t" : 66}], * "c" : [{"id" : 10, "t" : 23},{"id" : 15,"t" : 39}],"vs" : [{"id" : 20, "t" : 13},{"id" : 45,"t" : 22}], * "ps" : [{"id" : 30, "t" : 42},{"id" : 37,"t" : 18}]} */ protected void parseEffectData() throws Exception { JSONObject object = JSONObject.fromObject(getResult()); // cost time generateJsonCost = object.getInt("cost"); parseEffectData(DELIVER_TYPE.CPM, object, "s"); parseEffectData(DELIVER_TYPE.CPC, object, "c"); parseEffectData(DELIVER_TYPE.CPV, object, "vs"); parseEffectData(DELIVER_TYPE.CPP, object, "ps"); } /** * * @param type * @param jsonArray */ protected void parseEffectData(DELIVER_TYPE type,JSONObject object,String nodeName){ Map<Long, Long> effectMap = new HashMap<Long, Long>(); try{ //曝光数据 JSONArray jsonArray = object.getJSONArray(nodeName); //效果数据 for (int i = 0; i < jsonArray.size(); i++) { JSONObject data = (JSONObject) jsonArray.get(i); try{ if(isValidValues(data.getString("t"))){ long showTimes = data.getLong("t"); if (showTimes >= 0) { effectMap.put(data.getLong("id"), showTimes); } } }catch(JSONException e){ StringBuffer buffer = new StringBuffer(); buffer.append(type+" Effect Data, JSONException, jsonObject="); buffer.append(data.toString()).append(" , ip=").append(taskDTO.getIp()); logger.error(buffer.toString(),e); ErrorStackMessage msg = new ErrorStackMessage(e, buffer.toString(), new MergePolicy(true,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); } } if (!collect.isCollectCompleted()) { // put result to map collectEffectData.putEffectData(taskDTO.getIp(), effectMap,type); } }catch(Exception e){ if(nodeName.equals("s") || nodeName.equals("c")){ StringBuffer buffer = new StringBuffer(); buffer.append(type+" Effect Data, JSONException, ps=null"); logger.error(buffer.toString(),e); ErrorStackMessage msg = new ErrorStackMessage(e, buffer.toString(), new MergePolicy(true,true)); DispatcherConstants.collectErrorStackCollector.produce(msg); } } } /** * 包装运行状态 * @return */ protected CollectRunStatus wrapperRunStatus() { CollectRunStatus runStatus = new CollectRunStatus(); runStatus.setIp(taskDTO.getIp()); runStatus.setGenerateJsonCost(getGenerateJsonCost()); runStatus.setThreadRunTime(getEndTime() - taskDTO.getBeginTime()); runStatus.setRunStatus(isRunStatus()); runStatus.setReTryRunTimeList(getReTryRunTimeList()); return runStatus; } /** * 检查数据是否有效 * @param content * @return */ private boolean isValidValues(String content){ if(content==null || content.length()==0 || content.getBytes().length>12){ return false; } return true; } public List<Long> getReTryRunTimeList() { return reTryRunTimeList; } public long getEndTime() { return endTime; } public boolean isRunStatus() { return runStatus; } public String getResult() { return result; } public int getGenerateJsonCost() { return generateJsonCost; } }
主进程控制
import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.http.Header; import org.apache.http.HeaderElement; import org.apache.http.HttpEntity; import org.apache.http.HttpException; import org.apache.http.HttpRequest; import org.apache.http.HttpRequestInterceptor; import org.apache.http.HttpResponse; import org.apache.http.HttpResponseInterceptor; import org.apache.http.client.HttpClient; import org.apache.http.client.entity.GzipDecompressingEntity; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.params.CoreConnectionPNames; import org.apache.http.protocol.HttpContext; import org.slf4j.Logger; import com.javagc.context.ServerGroupContext; import com.javagc.context.ServerGroupContext.GROUPS; import com.javagc.entity.CollectRunStatus; import com.javagc.entity.CollectEffectData; import com.javagc.entity.CollectTaskDTO; import com.javagc.task.EffectCollectTask; import com.javagc.util.DispatcherConstants; import com.javagc.util.DispatcherUtil; /** * 收账服务<br> * 1、一组服务器收账使用最大时长,@see DispatcherConstants.COLLECTOR_TOTALTIMEOUT * @author stevending<br> * */ public class CollectService implements ICollectService { private static final Logger logger = DispatcherUtil.getLog(); /** * 分组信息 */ private GROUPS groupNum; /** * 收账结果日志 */ private Map<String,CollectRunStatus> runStatusMap = null; /** * 结果数据 */ private CollectEffectData effectData = null; /** * 检测是否完成收账 */ private CountDownLatch latch = null; /** * 收账完成状态 true=完成、false=未完成 */ private AtomicBoolean collectCompleted = new AtomicBoolean(false); /** * 构造方法 * @param groupNum */ public EffectCollectService(GROUPS groupNum){ this.groupNum = groupNum; } @Override public CollectEffectData doCollect() throws InterruptedException{ long beginTime = System.currentTimeMillis(); try{ //exec task exeTask(); } finally{ collectCompleted.set(true); //write run status writeRunStatus(); } logger.info(groupNum+" Total runTime:"+(System.currentTimeMillis()-beginTime) + " ms"); return effectData; } /** * 执行任务 * @throws InterruptedException */ private void exeTask() throws InterruptedException{ long beginTime = System.currentTimeMillis(); List<String> ips = ServerGroupContext.I.getGroupServerIps(groupNum); latch = new CountDownLatch(ips.size()); runStatusMap = new ConcurrentHashMap<String,CollectRunStatus>(); effectData = new CollectEffectData(); for(int i=0;i<ips.size();i++){ CollectTaskDTO staskDTO = new CollectTaskDTO( getHttpClient(), getHttpGet(ips.get(i),DispatcherConstants.COLLECTOR_PORT), ips.get(i), DispatcherConstants.COLLECTOR_TOTALTIMEOUT, beginTime); CollectTask stask = new CollectTask(staskDTO,effectData,runStatusMap,latch,this,groupNum); Thread st = new Thread(stask); st.setDaemon(true); st.start(); } //等待收账完成 latch.await(DispatcherConstants.COLLECTOR_TOTALTIMEOUT, TimeUnit.MILLISECONDS); //TODO 清理未完成的线程 } /** * 记录运行状态 */ protected void writeRunStatus(){ StringBuffer logBuffer = new StringBuffer("收账过程日志\n"); List<String> ips = ServerGroupContext.I.getGroupServerIps(groupNum); for(int i=0;i<ips.size();i++){ String ip = ips.get(i); if(runStatusMap.get(ip) == null){ CollectRunStatus consumerStatus = new CollectRunStatus(); consumerStatus.setIp(ip); consumerStatus.setRunStatus(false); consumerStatus.setThreadRunTime(DispatcherConstants.COLLECTOR_TOTALTIMEOUT); runStatusMap.put(ip, consumerStatus); } logBuffer.append(runStatusMap.get(ip).toString()).append("\n"); } logger.info(logBuffer.toString()); } /** * 获取httpClient * @return */ protected HttpClient getHttpClient(){ DefaultHttpClient httpclient = new DefaultHttpClient(); httpclient.addRequestInterceptor(new HttpRequestInterceptor() { public void process(final HttpRequest request,final HttpContext context) throws HttpException, IOException { if (!request.containsHeader("Accept-Encoding")) { request.addHeader("Accept-Encoding", "gzip"); } } }); httpclient.addResponseInterceptor(new HttpResponseInterceptor() { public void process(final HttpResponse response,final HttpContext context) throws HttpException, IOException { HttpEntity entity = response.getEntity(); if (entity != null) { Header ceheader = entity.getContentEncoding(); if (ceheader != null) { HeaderElement[] codecs = ceheader.getElements(); for (int i = 0; i < codecs.length; i++) { if (codecs[i].getName().equalsIgnoreCase("gzip")) { response.setEntity(new GzipDecompressingEntity(response.getEntity())); return; } } } } } }); httpclient.getParams().setParameter(CoreConnectionPNames.CONNECTION_TIMEOUT,DispatcherConstants.COLLECTOR_CONNTIMEOUT); httpclient.getParams().setParameter(CoreConnectionPNames.SO_TIMEOUT,DispatcherConstants.COLLECTOR_READTIMEOUT); return httpclient; } protected HttpGet getHttpGet(String ip,int port){ StringBuffer buffer = new StringBuffer(); buffer.append("http://").append(ip); buffer.append(":").append(port); buffer.append(DispatcherConstants.COLLECTOR_URL); return new HttpGet(buffer.toString()); } @Override public boolean isCollectCompleted() { return collectCompleted.get(); } }
相关推荐
1. **实时信息交互**:公交车载信息终端与调度中心通过GPRS实时交换数据,提供乘车参考信息,如到站时间、道路状况等,同时在车内向乘客发布。 2. **公交电子信息站牌**:站牌可显示车辆位置、到站时间、交通情况、...
雅迅公司的解决方案涵盖了系统设计的各个方面,包括系统的目标、原则、框架结构、业务模块、网络结构以及中心系统的架构。该系统具备多种特色功能,如车载DVR(数字视频录像机)功能,用于记录车辆行驶过程中的视频...
数据库是整个系统的数据存储中心,系统采用了关系型数据库管理系统,如SQL Server或MySQL,用于存储广告信息、预约记录、员工资料等大量数据。数据的增删改查操作高效稳定,且支持数据备份和恢复,确保数据安全。...
公交车辆的位置信息通过GPRS模块发送回调度中心,调度中心利用GIS系统实时监控车辆位置和速度,以便进行有效调度。 总结: 基于DGPS_GPRS的智能公交调度系统利用先进的定位和通信技术,提高了公交服务的精度和效率...
广告机主要用于公共场所,如购物中心、地铁站、机场等,通过高清晰度的显示屏播放动态或静态的广告内容,吸引人们的注意力,实现商业信息的有效传播。其工作原理涉及到图像处理、网络通信、多媒体播放等多个技术领域...
行车监控是系统的重要组成部分,通过实时监控公交车辆的行驶状态,可以及时掌握车辆的位置、速度、行驶路线等信息,有助于调度中心对公交线路进行有效管理。用户界面清晰直观,方便操作人员监控和调度。 智能调度...
系统通过车载北斗系统获取车辆的精确位置信息,通过车载平台传递至调度分配中心,中心在电子地图上直观展示车辆分布,从而进行安全、合理的调度。 该系统具备以下主要功能: 1. **出租车北斗安全监控系统**:24...
浙科物流管理模拟教学软件通过模拟实际操作,让学生深入理解这些功能,并体验总经理、调度中心、仓库中心、运输中心的角色分工与协作。 实验目标主要在于加深对物流十个功能的理解,并锻炼团队合作能力。总经理的...
例如,在购物中心,大型LED屏幕播放的绚丽广告能够吸引顾客驻足;在公共交通工具上,车载显示屏播放的广告能够覆盖大量流动人群;在电影院,预映广告则为观众带来丰富的观影体验。 在设备装置方面,多媒体广告的...
在实验过程中,学生分为两人一组,一人扮演两个角色,共同协作,体验总经理、调度中心、仓库中心和运输中心的不同职责,实现理论与实践的结合。 1. **总经理的角色与任务** - **订单受理**:总经理负责接受发货单...
一种基于ARM的多媒体广告机是一种智能的、现代化的广告展示设备,主要利用先进的微...此外,这种广告机的可扩展性和灵活性使得其能够在各种场景下应用,如购物中心、公共交通工具、办公楼宇等,满足不同环境和需求。
这种设备通常安装在人流密集的公共场所,如购物中心、地铁站、公交站等,利用其背面的大屏幕展示各种动态或静态广告内容,吸引过往行人的注意力。 在《背式移动广告终端.pdf》这份资料中,可能会详细介绍该设备的...
2. **电话叫车服务**:通过调度中心的电话系统,可以为乘客提供便捷的叫车服务,减少空驶率,提高整体运营效率。 3. **紧急报警功能**:安装在车辆内部的报警开关一旦被触发,会立即向监控中心发送报警信号及车辆...
电梯多媒体广告机是一种创新的商业展示设备,常用于公共场所如办公楼、购物中心等电梯内部,以播放动态广告、信息公告或品牌宣传等内容。本文件“一种电梯多媒体广告机”提供了关于这种设计装置的详细资料,旨在深入...
1. 商业推广:购物中心、餐饮店、电影院等地常设有多媒体广告机,以吸引顾客、提升品牌形象。 2. 公共服务:公共交通站点、机场、火车站等公共场所,可用于发布交通信息、公共服务公告等。 3. 城市美化:通过设计与...
这使得调度中心能随时掌握每辆出租车的状态,确保车辆在合法范围内运行。 2. 紧急报警与自动拍照取证:在发生紧急情况时,系统会自动拍照并发送给管理中心,为处理事件提供关键证据。这极大地提高了警方处理犯罪...
流程包括司机到场报到、例行保养、车辆调度以及主动监控等步骤,确保所有车辆信息实时传输到总控中心,实现高效管理和调度。 综上所述,GPS智能化调度系统不仅提高了公交运营效率,降低了运营成本,还增强了对突发...
任务调度中心负责管理和执行离线和实时任务,同时监控系统如Ganglia和Nagios确保了平台的稳定运行。 推荐广告的架构还涉及到内容质量和评分模型、用户画像体系、商业维度的分析,以及对用户行为的深度理解和语义...
数据存储中心如Hadoop、Hive、Hbase、Mysql、redis负责数据的持久化,任务调度中心则负责索引构建和内容质量评分。用户画像体系结合基本维度(如性别、年龄)和商业维度(如品类、品牌)等信息,利用网络爬虫引擎、...
3) 广告播放调度:中心控制广告播放,提高广告效益。 4) 核心调度管理:灵活调整线路车辆,确保资源合理配置。 二、停车场及枢纽调度管理子系统 1) 自动识别车辆信息:包括车牌、线路、到站时间等,提高管理效率。 ...