private void postHandle(List<QtyPredictTask> taskList, List<SkuSolutionVo> skus) throws IOException, BusinessException { List<QtyTaskResult> results = taskList.stream().map(task -> task.getOutput()).collect(Collectors.toList()); Map<String, Double> priceMap = results.parallelStream().collect(Collectors.groupingBy(e -> fetchGroupKey(e), Collectors.averagingDouble(QtyTaskResult::getPrice))); Map<String, Double> qtyMap = results.parallelStream().collect(Collectors.groupingBy(e -> fetchGroupKey(e), Collectors.summingDouble(QtyTaskResult::getQty))); Map<String, Double> revenueMap = results.parallelStream().collect(Collectors.groupingBy(e -> fetchGroupKey(e), Collectors.summingDouble(e -> e.getQty() * e.getPrice()))); Map<String, Double> profitMap = results.parallelStream().collect(Collectors.groupingBy(e -> fetchGroupKey(e), Collectors.summingDouble(e -> e.getQty() * e.getPrice() - e.getQty() * e.getCostPrice()))); for (SkuSolutionVo sku : skus) { String key = sku.getSkuCode() + PromConstant.seperator + sku.getLocationCode(); sku.setPromotionPrice(OperatorUtil.format3(priceMap.get(key))); sku.setAllSalesQty(OperatorUtil.format3(qtyMap.get(key))); sku.setAllSalesRevenue(OperatorUtil.format3(revenueMap.get(key))); sku.setAllSalesProfit(OperatorUtil.format3(profitMap.get(key))); sku.setPromotionQty(OperatorUtil.format3(sku.getAllSalesQty() - sku.getBaselineQty())); sku.setPromotionRevenue(OperatorUtil.format3(sku.getAllSalesRevenue() - sku.getBaselineRevenue())); sku.setPromotionProfit(OperatorUtil.format3(sku.getAllSalesProfit() - sku.getBaselineProfit())); } // 启动新线程,完成子任务的更新 thread(taskList); }
完成方法的计算
private String fetchGroupKey(QtyTaskResult qtyTaskResult) { return qtyTaskResult.getSkuCode() + PromConstant.seperator + qtyTaskResult.getLocationCode(); }
相关推荐
在这一背景下,华北电力大学的齐林海教授在2019年深圳全国第六届电能质量会议中提出将深度学习与流式计算应用于电能质量分析评估中,旨在克服传统方法在处理复杂扰动时的局限性。深度学习,尤其是卷积神经网络(CNN)...
流式计算是现代大数据处理领域中的重要技术,它主要用于处理和分析持续流入的实时数据流。在"日志类平台需要实时计算,有完整的预警策略,通知技术人员"这个场景中,我们可以深入探讨流式计算的核心概念、应用以及与...
大数据流式计算系统是应对海量实时数据处理需求而发展起来的一种新型计算模型。在现代互联网、物联网和金融等领域,实时数据的产生速度之快、规模之大,使得传统的批处理方式无法满足实时分析和决策的需求。因此,...
首先,文章介绍了流式处理相关概念,分析了流行的流式计算技术,并结合电信运营商需求提出了一套基于流式计算的DPI数据处理方案,应用于实际项目中以满足电信运营商对数据处理实时性的需求。通过实际应用,将DPI数据...
《Flink流式计算在实际生产项目中的应用详解》 Apache Flink,作为一个强大的开源流处理框架,已在大数据处理领域占据重要地位。本项目“Flink流式计算实际生产项目代码”聚焦于系统运维领域的流式处理,为初学者...
### 基于Zookeeper和Storm的车载流式计算框架 #### 一、引言 随着智能汽车行业的迅速发展,车载数据处理变得越来越重要。为了实时处理这些数据并从中提取有价值的信息,一种基于Zookeeper和Storm的车载流式计算...
流式计算是一种处理大规模数据的计算模式,它与传统的批处理计算有所不同,更注重实时性和连续性。在大数据时代,流式计算已经成为处理源源不断的数据流的关键技术,尤其适用于物联网、金融交易、社交媒体分析等场景...
《万字长文详解大数据流式计算框架 Flink 实战(中篇)》是一篇深入探讨 Flink 流处理框架的教程,旨在帮助读者理解 Flink 的核心概念、API 使用、窗口与水印机制、状态管理和容错机制。本文将详细解析其中的关键...
《地震大数据流式计算研究》 在当今信息化社会,数据的生产和积累速度达到了前所未有的水平,尤其是在地震监测领域。地震大数据不仅包括历史地震记录、地震波形数据,还包括地质构造、地磁、地电等多源信息,这些...
"百度新一代流式计算系统DStream3.pdf" 以下是基于提供的文件信息,生成的相关知识点: 流式计算系统的发展 流式计算系统的发展可以追溯到2008年Hadoop批量计算系统和2010年Bigpipe消息传输系统。2011年,DStream...
4. Kafka分布式消息队列:在地震数据流式计算过程中,Kafka作为数据分发器,其缓存和归类数据的功能是流式计算中必不可少的。它支持按Topic分类保存消息,并提供生产者(Producer)和消费者(Consumer)之间的消息...
随后,对系统支撑下的大数据分析技术和应用(包括深度学习、知识计算、社会计算与可视化等)进行了简要综述,总结了各种技术在大数据分析理解过程中的关键作用;最后梳理了大数据处理和分析面临的数据复杂性、计算复杂性...
在流式计算方面,MaxCompute引入了Stream SQL,这是一项完全托管的分布式数据流式处理服务,旨在解决实时数据分析的需求。 Stream SQL是MaxCompute中的亮点,它基于先进的分布式增量计算框架,能够实现低延迟的数据...
在MaxCompute中,流式计算是一项重要的功能,它允许用户以SQL的形式处理实时数据流,实现了低延迟的响应速度,极大地简化了开发流程。 流式计算,即Stream SQL,是一种基于分布式增量计算框架的服务,它可以实现...
在现代信息技术领域,流式计算引擎扮演着至关重要的角色,特别是在大数据分析和实时决策系统中。本主题聚焦于“一种基于流式计算引擎的实时数据处理方法及装置”,旨在探讨如何利用这种技术来优化设备装置的数据处理...
大数据流式计算关键技术及系统实例 大数据流式计算是指对实时产生的数据进行处理和分析,并将结果以流的形式实时反馈给用户。流式计算与传统批处理方式不同,它强调数据的实时性和连续性。在大数据应用场景中,流式...