- 浏览: 564738 次
- 性别:
- 来自: 北京
文章分类
- 全部博客 (618)
- java (109)
- Java web (43)
- javascript (52)
- js (15)
- 闭包 (2)
- maven (8)
- 杂 (28)
- python (47)
- linux (51)
- git (18)
- (1)
- mysql (31)
- 管理 (1)
- redis (6)
- 操作系统 (12)
- 网络 (13)
- mongo (1)
- nginx (17)
- web (8)
- ffmpeg (1)
- python安装包 (0)
- php (49)
- imagemagic (1)
- eclipse (21)
- django (4)
- 学习 (1)
- 书籍 (1)
- uml (3)
- emacs (19)
- svn (2)
- netty (9)
- joomla (1)
- css (1)
- 推送 (2)
- android (6)
- memcached (2)
- docker、 (0)
- docker (7)
- go (1)
- resin (1)
- groovy (1)
- spring (1)
最新评论
-
chokee:
...
Spring3 MVC 深入研究 -
googleyufei:
很有用, 我现在打算学学Python. 这些资料的很及时.
python的几个实用网站(转的) -
hujingwei1001:
太好了找的就是它
easy explore -
xiangtui:
例子举得不错。。。学习了
java callback -
幻影桃花源:
太好了,謝謝
Spring3 MVC 深入研究
原文地址:http://www.bkjia.com/Javabc/997348.html
redis队列及多线程应用,redis队列多线程
由于xxx平台上自己的博客已经很久没更新了,一直以来都是用的印象笔记来做工作中知识的积累存根,不知不觉印象笔记里已经有了四、五百遍文章。为了从新开始能与广大攻城狮共同提高技术能力与水平,随决心另起炉灶在新的博客与大家分享
经过一段时间项目的沉淀之后,对实际应用中的多线程开发及队列使用产生了深厚的兴趣,也将<<java并发编程实战>>仔细的阅读了两三遍,也看了很多并发编程的实践项目,也有了深刻的理解与在实践中合理应用队列、多线程开发的应用场景
1、真实应用场景描述:
由于一段时间以来要针对公司整个电商平台包括官网、移动端所有的交易数据进行统计,统计指标包括:pv、uv、实付金额、转化率、毛利率等等,按照各种不同的维度来统计计算出当前交易系统的各个指标的数据,但要求该项目是独立的,没有任务其它资源的协助及接品提供。经过一番xxxx思考讨论之后。业务上决定用以下解决方案:
A: 用一个定时服务每隔10秒去别的系统数据库抓取上一次查询时间以来新确认的订单(这种订单表示已经支付完在或者客户已经审核确认了),然后将这些订单的唯一编号放入redis队列。
B: 由于用到了队列,根据经验自然而然的想到了 启动单独的线程去redis队列中不断获取要统计处理的订单编号,然后将获取到的订单编号放入线程池中进行订单的统计任务处理。
开发实现:
FetchConfirmOrdersFromErpJob.java
1 /**
2 * 1、从redis中获取上次查询的时间戳
3 * 2、将当前时间戳放入到redis中,以便 下次按这个时间查询
4 * 3、去erp订单表查询confirm_time>=上次查询的时间的订单,放入队列中
5 */
6 @Scheduled(cron = "0/30 * * * * ?")
7 public void start(){
8 logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date());
9 StopWatch watch=new StopWatch();
10 watch.start();
11 //上次查询的时间
12 String preQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);
13
14 Date now=new Date();
15 if(StringUtils.isBlank(preQueryTimeStr)){
16 preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查询之前一个小时的订单
17 // preQueryTimeStr="2015-05-07 10:00:00";//本地测试的时候使用
18 }
19 //设置当前时间为上次查询的时间
20 this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));
21
22 List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);
23 if(confirmOrderIds==null){
24 logger.info("query confirmOrderIds is null,without order data need dealth..........");
25 return;
26 }
27 for (Map<String, Object> map : confirmOrderIds) {
//将订单编号放入队列中
28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString());
29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString());
30 }
31
32 watch.stop();
33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size());
34 }
OrderCalculate.java 队列获取订单线程
1 public class OrderCalculate {
2
3 private static final Log logger = LogFactory.getLog(OrderCalculate.class);
4
5 @Autowired
6 private static WriteRedisService writeRedisService;
7
8 private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4
9 ,new TjThreadFactory("CalculateAmount"));
10 static{
11 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
12 @Override
13 public void run() {
14 QueuePop.stop();
15 threadPool.shutdown();
16 }
17 }));
18 }
19
20 public void init(){
21 if(writeRedisService==null){
22 writeRedisService=SpringContext.getBean(WriteRedisService.class);
23 }
24 new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的队列,所以只要使用一个线程从队列里拿就ok
25 }
26
27 static class QueuePop implements Runnable{
28
29 volatile static boolean stop=false;
30
31 @Override
32 public void run() {
33 while(!stop){
34 //不断循环从队列里取出订单id
35 String orderId=null;
36 try {
37 orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);
38 if(orderId!=null){
39 logger.info("pop orderId:"+orderId);
//将获取的订单编号交给订单统计任务处理线程处理
40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date()));
41 }
42 } catch (Exception e1) {
43 logger.error("",e1);
44 }
45 //根据上线后的业务反馈来确定是否改成wait/notify策略来及时处理确认的订单
46 try {
47 Thread.sleep(10);
48 } catch (InterruptedException e) {
49 logger.error("",e);
50 // Thread.currentThread().interrupt();
51 //stop=true;//线程被打算继续执行,不应该被关闭,保证该线程永远不会死掉
52 }
53 }
54 }
55
56 public static void stop(){
57 stop=true;
58 }
59
60 }
61
62 }
CalculateAmoiunt.java 订单任务处理
1 public class CalculateAmount implements Runnable {
2 private static final Log logger = LogFactory.getLog(CalculateAmount.class);
3 private int orderId;
4 private Date now;//确认时间 这个时间有一定的延迟,基本可以忽略,如果没什么用
5 private OrderService orderServices;
6 private OrdHaveProductService ordHaveProductService;
7 private OrdPayByCashbackService ordPayByCashbackService;
8 private OrdPayByCouponService ordPayByCouponService;
9 private OrdPayByGiftCardService ordPayByGiftCardService;
10 private StatisticsService statisticsService;
11 private WriteRedisService writeRedisService;
12 private ReadRedisService readRedisService;
13 private ErpOrderGoodsService erpOrderGoodsService;
14 private ErpOrderService erpOrderService;
15
16
17 public CalculateAmount(int orderId,Date now) {
18 super();
19 this.orderId = orderId;
20 this.now=now;
21 orderServices=SpringContext.getBean(OrderService.class);
22 ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class);
23 ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class);
24 ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class);
25 ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class);
26 statisticsService=SpringContext.getBean(StatisticsService.class);
27 writeRedisService=SpringContext.getBean(WriteRedisService.class);
28 readRedisService=SpringContext.getBean(ReadRedisService.class);
29 erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class);
30 erpOrderService=SpringContext.getBean(ErpOrderService.class);
31 }
32
33 @Override
34 public void run() {
35 logger.info("CalculateAmount task run start........orderId:"+orderId);
36 StopWatch watch=new StopWatch();
37 watch.start();
38 /**
39 * 取出订单相关的所有数据同步到统计的库中
40 */
41 //TODO 考虑要不要将下面所有操作放到一个事务里面
42 List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId);
43 if(orders!=null&&orders.size()>0){
44 Map<String, Object> order = orders.get(0);
45
46 String orderSN=U.nvl(order.get("OrderSN"));//订单编号
47 Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用户d
48 Integer status=U.nvlInt(order.get("Status"),null);//状态
49 Date createTime=now;//(Date)order.get("CreateTime");//创建时间
50 Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新时间
51 BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//优惠总额 满减金额
52 BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//实付金额
53 BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//总金额
54
55 //从erp里查询出订单的确认时间
56 int dbConfirmTime=0;
57 try {
58 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId);
59 } catch (Exception e2) {
60 logger.error("",e2);
61 }
62 Date ct=new Date(dbConfirmTime*1000L);
63
64 int[] dates=U.getYearMonthDayHour(ct);//
65 if(modifyTime!=null){
66 dates=U.getYearMonthDayHour(modifyTime);//
67 }
68 int year=dates[0];//年
69 int month=dates[1];//月
70 int day=dates[2];//日
71 int hour=dates[3];//小时
72
73 String ordersId=orderId+"";//生成订单id
74
75 //查询订单的来源和搜索引擎关键字
76 String source="";
77 String seKeyWords="";
78 List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN);
79 if(orderDataList!=null&&!orderDataList.isEmpty()){
80 OrdersData ordersData = orderDataList.get(0);
81 source=ordersData.getSource();
82 seKeyWords=ordersData.getSeKeyWords();
83 }
84
85 //TODO 将订单入库
86 ArrayList<RelOrders> relOrdersList = Lists.newArrayList();
87 RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未计算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime);
88 relOrdersList.add(relOrders);
89
90 try {
91 relOrders.setConfirmTime(ct);
92 //查询RelOrders是否存在
93 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN);
94 if(dbOrders!=null){
95 //更新
96 dbOrders.setStatus(Byte.valueOf(status+""));
97 dbOrders.setConfirmTime(ct);
98 dbOrders.setModifyTime(modifyTime);
99 this.statisticsService.updateByPrimaryKeySelective(dbOrders);
100 return;
101 }else{
102 Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);
103 }
104 } catch (Exception e) {
105 logger.error("insertRelOrdersBatch error",e);
106 }
107 /**
108 * 查这个订单的返现、优惠券、礼品卡 的金额
109 */
110 List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);
111 List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);
112
113 BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返现金额
114 BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//红包金额
115 /**
116 * 查询出这个订单的所有商品
117 */
118 List<Map<String, Object>> products=null;
119 Map<String,Object> productToKeyWordMap=Maps.newHashMap();
120 try {
121 products = this.ordHaveProductService.selectByOrderId(orderId);
122 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);
123 if(ordersItemDataList!=null){
124 for (OrdersItemData ordersItemData : ordersItemDataList) {
125 productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());
126 }
127 }
128 } catch (Exception e1) {
129 logger.error("",e1);
130 }
131 if(products!=null){
132 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();
133 for (Map<String, Object> product : products) {
134 Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id
135 Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//购买数量
136 String SN=U.nvl(product.get("SN"),"");
137 BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//购买价格
138 BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//购买总价格
139 BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//单品实付金额
140
141 BigDecimal cost=null;//商品成本 TODO 调别人的接口
142 BigDecimal realtimeAmount=null;//实付金额
143
144 BigDecimal pdCashAmount=BigDecimal.ZERO;//每个商品的返现
145 BigDecimal pdcouponAmont=BigDecimal.ZERO;//每个商品的优惠券
146
147 //商品价格所占订单比例
148 if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){
149 pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);
150 pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);
151 discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);
152 }
153
154 realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);
155
156 RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));
157
158 relOrdersItemList.add(item);
159
160 //如果确认时间属于同一天的话,将商品实付金额放入到redis排行榜中
161 if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){
162 //如果订单的状态是这几种,刚将该商品加入到实付金额的排行 榜中
163 dates=U.getYearMonthDayHour(ct);//
164 int days=dates[2];
165 //某一个商品某一天的实付金额
166 BigDecimal itemRelAmount=BigDecimal.ZERO;
167 //从redis里取出这个商品的实付金额,然后累加
168 String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
169 if(StringUtils.isNotBlank(itemRelAmountStr)){
170 itemRelAmount=new BigDecimal(itemRelAmountStr);
171 }
172 realtimeAmount=itemRelAmount.add(realtimeAmount);
173 writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());
174 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
175 writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");
176 //确认的销量
177 Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);
178 writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");
179
180 String itemType="";
181 Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);
182 itemType=pMap.get("categoryId");
183 if(StringUtils.isNotBlank(itemType)){
184 if(ProductCategory.isGuanBai(itemType)){
185 //如果是白酒 官白的访客数排行
186 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days, realtimeAmount.doubleValue(), productId+"");//
187 //确认的销量排行
188 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//
189 }else if(ProductCategory.isGuanHong(itemType)){
190 //官红的访客数排行
191 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days, realtimeAmount.doubleValue(), productId+"");//
192 //确认的销量排行
193 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//
194 }
195 }
196
197 //某一个商品的销量加入删除列表
198 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
199 }
200 }
201 try {
202 //TODO 将订单商品明细入库
203 this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);
204 //再将订单的状态改为已计算
205 this.statisticsService.updateIsCal(orderSN,IsCal.已计算.getFlag());//将是否计算改成已计算
206 //该订单的所有商品的成本同步到现在的库中。
207 this.calOrderProductCostSync(orderId,orderSN,products);
208 } catch (Exception e) {
209 logger.error("insertRelOrdersItemBatch or updateIsCal error",e);
210 }
211 }
212 }
213 watch.stop();
214 logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+" orderId:"+orderId);
215 }
216
217 private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){
218 List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);
219 if(ordersList==null||ordersList.isEmpty()){
220 logger.error("according orderId to query some data from erp return is null.........");
221 return;
222 }
223 Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");
224
225 List<RelItemCosts> list=Lists.newArrayList();
226 for (Map<String, Object> map : ordersList) {
227 RelItemCosts itemCost=new RelItemCosts();
228 if(map==null){
229 continue;
230 }
231 Integer itemId=U.nvlInt(map.get("goods_id"),-99);
232 BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);
233 itemCost.setId(U.randomUUID());
234 itemCost.setOrdersId(orderId+"");
235 itemCost.setOrdersNo(orderSN);
236 itemCost.setItemId(itemId);
237 itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));
238 itemCost.setCosts(costs);
239 itemCost.setCreateTime(new Date());
240 itemCost.setModifyTime(new Date());
241 list.add(itemCost);
242 }
243
244 this.statisticsService.insertRelItemCostsBatch(list);
245
246 }
247
248 }
注意:
1、redis2.6版本使用lpush、rpop出列的时候会丢失数据。换成2.8及以上的版本运行正常。
2、由于应用会部署到多个结点,所以无法直接采用java的BlockingQueue阻塞队列,帮采用redis提供的队列支持。
3、如果要做到统计的绝对实时,最好采用大数据的实时计算的解决方案:kafka+storm 来实现
以上为队列结合线程的实践案例,供大家一起探讨。
转载请注明出处 ,请大家尊重作者的劳动成果。
redis队列及多线程应用,redis队列多线程
由于xxx平台上自己的博客已经很久没更新了,一直以来都是用的印象笔记来做工作中知识的积累存根,不知不觉印象笔记里已经有了四、五百遍文章。为了从新开始能与广大攻城狮共同提高技术能力与水平,随决心另起炉灶在新的博客与大家分享
经过一段时间项目的沉淀之后,对实际应用中的多线程开发及队列使用产生了深厚的兴趣,也将<<java并发编程实战>>仔细的阅读了两三遍,也看了很多并发编程的实践项目,也有了深刻的理解与在实践中合理应用队列、多线程开发的应用场景
1、真实应用场景描述:
由于一段时间以来要针对公司整个电商平台包括官网、移动端所有的交易数据进行统计,统计指标包括:pv、uv、实付金额、转化率、毛利率等等,按照各种不同的维度来统计计算出当前交易系统的各个指标的数据,但要求该项目是独立的,没有任务其它资源的协助及接品提供。经过一番xxxx思考讨论之后。业务上决定用以下解决方案:
A: 用一个定时服务每隔10秒去别的系统数据库抓取上一次查询时间以来新确认的订单(这种订单表示已经支付完在或者客户已经审核确认了),然后将这些订单的唯一编号放入redis队列。
B: 由于用到了队列,根据经验自然而然的想到了 启动单独的线程去redis队列中不断获取要统计处理的订单编号,然后将获取到的订单编号放入线程池中进行订单的统计任务处理。
开发实现:
FetchConfirmOrdersFromErpJob.java
1 /**
2 * 1、从redis中获取上次查询的时间戳
3 * 2、将当前时间戳放入到redis中,以便 下次按这个时间查询
4 * 3、去erp订单表查询confirm_time>=上次查询的时间的订单,放入队列中
5 */
6 @Scheduled(cron = "0/30 * * * * ?")
7 public void start(){
8 logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date());
9 StopWatch watch=new StopWatch();
10 watch.start();
11 //上次查询的时间
12 String preQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);
13
14 Date now=new Date();
15 if(StringUtils.isBlank(preQueryTimeStr)){
16 preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查询之前一个小时的订单
17 // preQueryTimeStr="2015-05-07 10:00:00";//本地测试的时候使用
18 }
19 //设置当前时间为上次查询的时间
20 this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));
21
22 List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);
23 if(confirmOrderIds==null){
24 logger.info("query confirmOrderIds is null,without order data need dealth..........");
25 return;
26 }
27 for (Map<String, Object> map : confirmOrderIds) {
//将订单编号放入队列中
28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString());
29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString());
30 }
31
32 watch.stop();
33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size());
34 }
OrderCalculate.java 队列获取订单线程
1 public class OrderCalculate {
2
3 private static final Log logger = LogFactory.getLog(OrderCalculate.class);
4
5 @Autowired
6 private static WriteRedisService writeRedisService;
7
8 private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4
9 ,new TjThreadFactory("CalculateAmount"));
10 static{
11 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
12 @Override
13 public void run() {
14 QueuePop.stop();
15 threadPool.shutdown();
16 }
17 }));
18 }
19
20 public void init(){
21 if(writeRedisService==null){
22 writeRedisService=SpringContext.getBean(WriteRedisService.class);
23 }
24 new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的队列,所以只要使用一个线程从队列里拿就ok
25 }
26
27 static class QueuePop implements Runnable{
28
29 volatile static boolean stop=false;
30
31 @Override
32 public void run() {
33 while(!stop){
34 //不断循环从队列里取出订单id
35 String orderId=null;
36 try {
37 orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);
38 if(orderId!=null){
39 logger.info("pop orderId:"+orderId);
//将获取的订单编号交给订单统计任务处理线程处理
40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date()));
41 }
42 } catch (Exception e1) {
43 logger.error("",e1);
44 }
45 //根据上线后的业务反馈来确定是否改成wait/notify策略来及时处理确认的订单
46 try {
47 Thread.sleep(10);
48 } catch (InterruptedException e) {
49 logger.error("",e);
50 // Thread.currentThread().interrupt();
51 //stop=true;//线程被打算继续执行,不应该被关闭,保证该线程永远不会死掉
52 }
53 }
54 }
55
56 public static void stop(){
57 stop=true;
58 }
59
60 }
61
62 }
CalculateAmoiunt.java 订单任务处理
1 public class CalculateAmount implements Runnable {
2 private static final Log logger = LogFactory.getLog(CalculateAmount.class);
3 private int orderId;
4 private Date now;//确认时间 这个时间有一定的延迟,基本可以忽略,如果没什么用
5 private OrderService orderServices;
6 private OrdHaveProductService ordHaveProductService;
7 private OrdPayByCashbackService ordPayByCashbackService;
8 private OrdPayByCouponService ordPayByCouponService;
9 private OrdPayByGiftCardService ordPayByGiftCardService;
10 private StatisticsService statisticsService;
11 private WriteRedisService writeRedisService;
12 private ReadRedisService readRedisService;
13 private ErpOrderGoodsService erpOrderGoodsService;
14 private ErpOrderService erpOrderService;
15
16
17 public CalculateAmount(int orderId,Date now) {
18 super();
19 this.orderId = orderId;
20 this.now=now;
21 orderServices=SpringContext.getBean(OrderService.class);
22 ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class);
23 ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class);
24 ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class);
25 ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class);
26 statisticsService=SpringContext.getBean(StatisticsService.class);
27 writeRedisService=SpringContext.getBean(WriteRedisService.class);
28 readRedisService=SpringContext.getBean(ReadRedisService.class);
29 erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class);
30 erpOrderService=SpringContext.getBean(ErpOrderService.class);
31 }
32
33 @Override
34 public void run() {
35 logger.info("CalculateAmount task run start........orderId:"+orderId);
36 StopWatch watch=new StopWatch();
37 watch.start();
38 /**
39 * 取出订单相关的所有数据同步到统计的库中
40 */
41 //TODO 考虑要不要将下面所有操作放到一个事务里面
42 List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId);
43 if(orders!=null&&orders.size()>0){
44 Map<String, Object> order = orders.get(0);
45
46 String orderSN=U.nvl(order.get("OrderSN"));//订单编号
47 Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用户d
48 Integer status=U.nvlInt(order.get("Status"),null);//状态
49 Date createTime=now;//(Date)order.get("CreateTime");//创建时间
50 Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新时间
51 BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//优惠总额 满减金额
52 BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//实付金额
53 BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//总金额
54
55 //从erp里查询出订单的确认时间
56 int dbConfirmTime=0;
57 try {
58 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId);
59 } catch (Exception e2) {
60 logger.error("",e2);
61 }
62 Date ct=new Date(dbConfirmTime*1000L);
63
64 int[] dates=U.getYearMonthDayHour(ct);//
65 if(modifyTime!=null){
66 dates=U.getYearMonthDayHour(modifyTime);//
67 }
68 int year=dates[0];//年
69 int month=dates[1];//月
70 int day=dates[2];//日
71 int hour=dates[3];//小时
72
73 String ordersId=orderId+"";//生成订单id
74
75 //查询订单的来源和搜索引擎关键字
76 String source="";
77 String seKeyWords="";
78 List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN);
79 if(orderDataList!=null&&!orderDataList.isEmpty()){
80 OrdersData ordersData = orderDataList.get(0);
81 source=ordersData.getSource();
82 seKeyWords=ordersData.getSeKeyWords();
83 }
84
85 //TODO 将订单入库
86 ArrayList<RelOrders> relOrdersList = Lists.newArrayList();
87 RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未计算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime);
88 relOrdersList.add(relOrders);
89
90 try {
91 relOrders.setConfirmTime(ct);
92 //查询RelOrders是否存在
93 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN);
94 if(dbOrders!=null){
95 //更新
96 dbOrders.setStatus(Byte.valueOf(status+""));
97 dbOrders.setConfirmTime(ct);
98 dbOrders.setModifyTime(modifyTime);
99 this.statisticsService.updateByPrimaryKeySelective(dbOrders);
100 return;
101 }else{
102 Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);
103 }
104 } catch (Exception e) {
105 logger.error("insertRelOrdersBatch error",e);
106 }
107 /**
108 * 查这个订单的返现、优惠券、礼品卡 的金额
109 */
110 List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);
111 List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);
112
113 BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返现金额
114 BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//红包金额
115 /**
116 * 查询出这个订单的所有商品
117 */
118 List<Map<String, Object>> products=null;
119 Map<String,Object> productToKeyWordMap=Maps.newHashMap();
120 try {
121 products = this.ordHaveProductService.selectByOrderId(orderId);
122 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);
123 if(ordersItemDataList!=null){
124 for (OrdersItemData ordersItemData : ordersItemDataList) {
125 productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());
126 }
127 }
128 } catch (Exception e1) {
129 logger.error("",e1);
130 }
131 if(products!=null){
132 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();
133 for (Map<String, Object> product : products) {
134 Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id
135 Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//购买数量
136 String SN=U.nvl(product.get("SN"),"");
137 BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//购买价格
138 BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//购买总价格
139 BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//单品实付金额
140
141 BigDecimal cost=null;//商品成本 TODO 调别人的接口
142 BigDecimal realtimeAmount=null;//实付金额
143
144 BigDecimal pdCashAmount=BigDecimal.ZERO;//每个商品的返现
145 BigDecimal pdcouponAmont=BigDecimal.ZERO;//每个商品的优惠券
146
147 //商品价格所占订单比例
148 if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){
149 pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);
150 pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);
151 discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);
152 }
153
154 realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);
155
156 RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));
157
158 relOrdersItemList.add(item);
159
160 //如果确认时间属于同一天的话,将商品实付金额放入到redis排行榜中
161 if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){
162 //如果订单的状态是这几种,刚将该商品加入到实付金额的排行 榜中
163 dates=U.getYearMonthDayHour(ct);//
164 int days=dates[2];
165 //某一个商品某一天的实付金额
166 BigDecimal itemRelAmount=BigDecimal.ZERO;
167 //从redis里取出这个商品的实付金额,然后累加
168 String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
169 if(StringUtils.isNotBlank(itemRelAmountStr)){
170 itemRelAmount=new BigDecimal(itemRelAmountStr);
171 }
172 realtimeAmount=itemRelAmount.add(realtimeAmount);
173 writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());
174 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
175 writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");
176 //确认的销量
177 Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);
178 writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");
179
180 String itemType="";
181 Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);
182 itemType=pMap.get("categoryId");
183 if(StringUtils.isNotBlank(itemType)){
184 if(ProductCategory.isGuanBai(itemType)){
185 //如果是白酒 官白的访客数排行
186 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days, realtimeAmount.doubleValue(), productId+"");//
187 //确认的销量排行
188 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//
189 }else if(ProductCategory.isGuanHong(itemType)){
190 //官红的访客数排行
191 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days, realtimeAmount.doubleValue(), productId+"");//
192 //确认的销量排行
193 this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//
194 }
195 }
196
197 //某一个商品的销量加入删除列表
198 writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
199 }
200 }
201 try {
202 //TODO 将订单商品明细入库
203 this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);
204 //再将订单的状态改为已计算
205 this.statisticsService.updateIsCal(orderSN,IsCal.已计算.getFlag());//将是否计算改成已计算
206 //该订单的所有商品的成本同步到现在的库中。
207 this.calOrderProductCostSync(orderId,orderSN,products);
208 } catch (Exception e) {
209 logger.error("insertRelOrdersItemBatch or updateIsCal error",e);
210 }
211 }
212 }
213 watch.stop();
214 logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+" orderId:"+orderId);
215 }
216
217 private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){
218 List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);
219 if(ordersList==null||ordersList.isEmpty()){
220 logger.error("according orderId to query some data from erp return is null.........");
221 return;
222 }
223 Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");
224
225 List<RelItemCosts> list=Lists.newArrayList();
226 for (Map<String, Object> map : ordersList) {
227 RelItemCosts itemCost=new RelItemCosts();
228 if(map==null){
229 continue;
230 }
231 Integer itemId=U.nvlInt(map.get("goods_id"),-99);
232 BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);
233 itemCost.setId(U.randomUUID());
234 itemCost.setOrdersId(orderId+"");
235 itemCost.setOrdersNo(orderSN);
236 itemCost.setItemId(itemId);
237 itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));
238 itemCost.setCosts(costs);
239 itemCost.setCreateTime(new Date());
240 itemCost.setModifyTime(new Date());
241 list.add(itemCost);
242 }
243
244 this.statisticsService.insertRelItemCostsBatch(list);
245
246 }
247
248 }
注意:
1、redis2.6版本使用lpush、rpop出列的时候会丢失数据。换成2.8及以上的版本运行正常。
2、由于应用会部署到多个结点,所以无法直接采用java的BlockingQueue阻塞队列,帮采用redis提供的队列支持。
3、如果要做到统计的绝对实时,最好采用大数据的实时计算的解决方案:kafka+storm 来实现
以上为队列结合线程的实践案例,供大家一起探讨。
转载请注明出处 ,请大家尊重作者的劳动成果。
发表评论
-
redis持久化rdb aof简介
2015-11-18 16:04 688原文地址:http://chengjian ... -
Redis的Java客户端Jedis的八种调用方式(事务、管道、分布式…)介绍
2015-11-18 15:15 461原文地址:http://blog.csdn.net/truon ... -
Jedis 与 ShardedJedis 设计
2015-11-04 14:03 533原文地址:http://yychao.iteye.com/bl ... -
Redis学习笔记11--Redis分布式
2015-07-20 15:23 472原文地址:http://blog.csdn.net/freeb ... -
Redis-Dump:将Redis数据dump成json格式
2013-08-19 15:03 1857作者:nosqlfan on 星期天, 一月 8, 2 ...
相关推荐
07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-Redis队列Stream、Redis多线程详解_ev.07-...
在C#编程中,队列(Queue)是一种线性数据结构,它遵循“先进先出”(First In First Out, FIFO)的原则。队列在多线程环境下常常用于任务...理解这些基本概念和操作对于开发涉及多线程和队列的C#应用程序至关重要。
标题中的“Redis延时消息队列基于swoole实现的多进程消费端”是指使用Redis作为消息队列,结合Swoole的多进程特性来构建一个高效、可扩展的延迟消息处理系统。在这个系统中,Redis作为一个可靠的键值存储,用于暂存...
redis 案例。包含, 队列操作, socket通信, 以及 socket 和 redis 配合 redis 案例。包含, 队列操作, socket通信, 以及 socket 和 redis 配合
为了保证多线程安全,可以使用`QRedis`的同步操作或使用Qt的信号槽机制,当有新消息时自动触发处理函数。 5. **异常处理**:考虑到网络问题和Redis服务可能出现的异常,需要在代码中添加适当的错误处理和重试机制,...
在.NET MVC应用中,使用Redis实现抢购队列是一种高效且可扩展的解决方案。Redis作为一个高性能的键值存储系统,其支持多种数据结构,如字符串、哈希表、列表、集合等,使得它非常适合用于构建高并发场景下的队列服务...
标签"redis队列"提示我们,Redis不仅仅可以作为发布/订阅系统,还可以用于构建消息队列,例如使用`RPOPLPUSH`或`BLPOP`命令实现先进先出(FIFO)队列。然而,对于大型高并发系统,可能需要考虑使用Redis的Stream或更...
毕业设计可以包括文献综述、需求分析、方案设计、实施与测试等多个阶段,以确保整个过程的科学性和系统性。 其次,毕业设计的完成通常需要学生具备一定的独立思考和解决问题的能力。在研究过程中,学生可能需要采用...
例如,当用户发起抽奖请求时,请求会被放入Redis队列,然后后台按照先进先出的原则进行处理,避免了大量的并发操作导致的系统压力。 4. **队列处理**:在抽奖系统中,使用Redis队列可以实现异步处理,将抽奖逻辑从...
Goroutines允许多个任务并发执行,而Channel则提供了一种安全的数据共享方式,避免了多线程环境下的数据竞争问题。 1. **并发队列处理**:在Go中,可以创建多个Goroutine来消费Redis中的消息队列。每个Goroutine...
3. 单线程结构:Redis 采用了单线程结构,能够提高系统的性能和可靠性。 4. 事件驱动:Redis 使用了事件驱动机制,能够高效地处理大量的请求。 Redis 应用实践 Redis 的应用非常广泛,以下是一些常见的应用场景: ...
Golang的goroutine和channel特性使得Go-Delayer能轻松实现多线程并发,提高处理速度。 对于客户端支持,Go-Delayer提供了Golang原生的API,同时也支持PHP等其他语言的客户端接入。这使得无论你的服务端环境是什么...
Redis 作为一种内存数据库,以其高性能、低延迟的特点,被广泛应用于消息队列场景。本文将详细介绍如何利用 Java 和 Jedis(Java 客户端库)来实现基于 Redis 的消息队列。 #### 二、Jedis 库简介 Jedis 是一个高...
5. **多线程编程**:如果应用需要并发处理多个Redis请求,LabVIEW的多线程或者任务调度功能将被用到,以确保高效的数据处理。 6. **LabVIEW插件开发**:如果包含自定义LabVIEW VI(虚拟仪器),可能涉及到LabVIEW的...
单线程模型的优势在于避免了多线程环境下的竞态条件和锁同步问题,简化了代码的复杂性,提高了可维护性。此外,由于所有操作都在同一个线程中执行,Redis可以确保命令的顺序执行,这对于一些需要事务或原子性的操作...
Redis是一款高性能的键值存储系统,广泛应用于缓存、消息队列、计数器等多个场景。下面将详细探讨Redis的核心原理和常见应用实践。 1. **Redis核心原理**: - **数据结构**:Redis基于多种高效的数据结构,如字符...