Part One: 一个线程处理
if (CollectionUtils.isNotEmpty(cqcAttachmentDataList)) {
long startTimes = System.currentTimeMillis();
final CountDownLatch latchOnlyOne = new CountDownLatch(1);
new Thread(new CqcAttachmentDataWorker(latchOnlyOne,"车前程附件数据处理-线程-only", cqcAttachmentDataList)).start();
try {
latchOnlyOne.await();
long endTimes = System.currentTimeMillis();
logger.info("所有线程(1个线程)执行完毕:" + ((endTimes - startTimes)/1000) + "秒");
} catch (InterruptedException e) {
logger.error("线程(1个线程)处理异常::::",e);
}
}
//数据处理线程worker
public class CqcAttachmentDataWorker implements Runnable{
List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;
String name = "";
CountDownLatch latch;
public CqcAttachmentDataWorker(CountDownLatch latch,String name, List<TCdCqcAttachmentData> cqcAttachmentDataListTemp){
this.name = name;
this.latch = latch;
this.cqcAttachmentDataSubList = cqcAttachmentDataListTemp;
}
@Override
public void run() {
logger.info(name + "正在执行...");
try {
//2.遍历处理
if (CollectionUtils.isNotEmpty(cqcAttachmentDataSubList)) {
//do it
}
} catch (Exception e) {
logger.error("车前程-数据处理run异常::::", e);
} finally {
latch.countDown();
}
}
}
Part Two: 多线程数据处理1-初级
List<TCdCqcAttachmentData> cqcAttachmentDataList = tCdCqcAttachmentDataService.findTCdCqcAttachmentDataAllList();
if (CollectionUtils.isNotEmpty(cqcAttachmentDataList)) {
long startTimes = System.currentTimeMillis();
int threadCount = cqcWorkerThreadCount;
int total = cqcAttachmentDataList.size();
if (total < threadCount) {
final CountDownLatch latchOnlyOne = new CountDownLatch(1);
new Thread(new CqcAttachmentDataWorker(latchOnlyOne,"车前程附件数据处理-线程-only", cqcAttachmentDataList)).start();
try {
latchOnlyOne.await();
long endTimes = System.currentTimeMillis();
logger.info("所有线程(1个线程)执行完毕:" + ((endTimes - startTimes)/1000) + "秒");
} catch (InterruptedException e) {
logger.error("线程(1个线程)处理异常::::",e);
}
} else {
int every = total / threadCount;
final CountDownLatch latch = new CountDownLatch(threadCount);
List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;
int divideRemainNumber = total % every; //取模后的余数
int divideRemainNumberTmep = total % threadCount; //取模后的余数
for (int i = 1; i <= threadCount; i++) {
int startIndex = (i - 1) * every;
int endIndex = startIndex + every;
if (total >= endIndex) {
if(divideRemainNumber > 0){
if (i == threadCount)
{
cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, total);
} else {
cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);
}
} else {
if (divideRemainNumberTmep > 0) {
if (i == threadCount)
{
cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, total);
} else {
cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);
}
} else {
cqcAttachmentDataSubList = cqcAttachmentDataList.subList(startIndex, startIndex + every);
}
}
}
new Thread(new CqcAttachmentDataWorker(latch,"车前程附件数据处理-线程" + i, cqcAttachmentDataSubList)).start();
}
try {
latch.await();
long endTimes = System.currentTimeMillis();
logger.info("所有线程执行完毕:" + ((endTimes - startTimes)/1000) + "秒");
} catch (InterruptedException e) {
logger.error("线程处理异常::::",e);
}
}
}
//数据处理线程worker
public class CqcAttachmentDataWorker implements Runnable{
List<TCdCqcAttachmentData> cqcAttachmentDataSubList = null;
String name = "";
CountDownLatch latch;
public CqcAttachmentDataWorker(CountDownLatch latch,String name, List<TCdCqcAttachmentData> cqcAttachmentDataListTemp){
this.name = name;
this.latch = latch;
this.cqcAttachmentDataSubList = cqcAttachmentDataListTemp;
}
@Override
public void run() {
logger.info(name + "正在执行...");
//2.遍历处理
if (CollectionUtils.isNotEmpty(cqcAttachmentDataSubList)) {
}
latch.countDown();
}
}
Part Three: 多线程数据处理2-高级-线程池实现
代码实现:
// 多线程处理
private static final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 10, 1, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(50000), new ThreadPoolExecutor.CallerRunsPolicy());
if (CollectionUtils.isNotEmpty(ycAndyzTCdBusiExtendlist)) {
LinkedBlockingQueue<TCdBusiExtend> queue = new LinkedBlockingQueue<TCdBusiExtend>();
queue.addAll(ycAndyzTCdBusiExtendlist);
int size = queue.size();
CountDownLatch countDownLatch = new CountDownLatch(size);
for (int i = 0; i < size; i++) {
threadPool.execute(new CqcAttachmentDataHandler(queue, countDownLatch, cqcHandlerServiceFactory, cqcAttachmentDomain, cqcAttachmentFolder));
}
countDownLatch.await();
long poolEndTimes = System.currentTimeMillis();
logger.info("线程池对目标数据处理执行完毕:" + ((poolEndTimes - targetBeginTimes)/1000) + "秒");
long endTimes = System.currentTimeMillis();
logger.info("全部执行完毕:" + ((endTimes - startTimes)/1000) + "秒");
}
worker:
public class CqcAttachmentDataHandler implements Runnable {
private static final AppLogger logger = new AppLogger(CqcAttachmentDataHandler.class);
private LinkedBlockingQueue<TCdBusiExtend> queue;
private CountDownLatch countDownLatch;
private String cqcAttachmentDomain;
private String cqcAttachmentFolder;
private CqcHandlerServiceFactory cqcHandlerServiceFactory;
public CqcAttachmentDataHandler(LinkedBlockingQueue<TCdBusiExtend> queue, CountDownLatch countDownLatch,CqcHandlerServiceFactory cqcHandlerServiceFactoryTemp, String cqcAttachmentDomainTemp, String cqcAttachmentFolderTemp) {
this.queue = queue;
this.countDownLatch = countDownLatch;
this.cqcHandlerServiceFactory=cqcHandlerServiceFactoryTemp;
this.cqcAttachmentDomain = cqcAttachmentDomainTemp;
this.cqcAttachmentFolder = cqcAttachmentFolderTemp;
}
@Override
public void run() {
try {
TCdBusiExtend tCdBusiExtend = queue.poll();
if(tCdBusiExtend != null) {
//do it
}
} catch (Exception e) {
logger.error("CqcAttachmentDataHandler.run error", e);
} finally {
countDownLatch.countDown();
}
}
}
工厂模式:
@Service
public class CqcHandlerServiceFactory {
@Resource
private ITCdCqcAttachmentDataService tCdCqcAttachmentDataService;
@Autowired
private TCdBusiExtendMapper tCdBusiExtendMapper;
public ITCdCqcAttachmentDataService gettCdCqcAttachmentDataService() {
return tCdCqcAttachmentDataService;
}
public void settCdCqcAttachmentDataService(ITCdCqcAttachmentDataService tCdCqcAttachmentDataService) {
this.tCdCqcAttachmentDataService = tCdCqcAttachmentDataService;
}
public TCdBusiExtendMapper gettCdBusiExtendMapper() {
return tCdBusiExtendMapper;
}
public void settCdBusiExtendMapper(TCdBusiExtendMapper tCdBusiExtendMapper) {
this.tCdBusiExtendMapper = tCdBusiExtendMapper;
}
}
==========================================================
Part Four: ThreadPoolTaskExecutor的使用
<!--核心线程池--> <beanid="taskExecutor" name="taskExecutor_passive" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <propertyname="corePoolSize" value="40"/> <propertyname="maxPoolSize" value="100"/> <propertyname="queueCapacity" value="3072"/> <propertyname="rejectedExecutionHandler"> <beanclass="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /><!--调用者运行--> </property> </bean>
/** * 线程池 */ @Resource(name = "taskExecutor") private ThreadPoolTaskExecutor cartPropertyDataGetterTaskExecutor;
public ThreadPoolTaskExecutor getTaskExecutor() { return cartPropertyDataGetterTaskExecutor; } public void setTaskExecutor(ThreadPoolTaskExecutor taskExecutor) { this.cartPropertyDataGetterTaskExecutor = taskExecutor; }
// 命令立减查询 if (result.isActivateKeyt() && currentCart.getKeyt() != null && StringUtil.isNotBlank(currentCart.getKeyt().getKeytStr())) { keytFuture = this.getTaskExecutor().submit(new KeytGetter(keytBS, user, siteId, currentCart.getKeyt().getKeytStr(), currentCart, commerceItemList, priceInfo, shippingGroups)); } if (blueCouponsFuture != null) { final BlueCouponSummary blueCouponSummary = buildBlueCouponSummary(blueCouponsFuture.get()); result.setBlueCouponsAmount(blueCouponSummary.getUsedAmount()); // 蓝券的使用金额 result.setBlueCouponsNum(blueCouponSummary.getUsedNum()); // 蓝券的使用张数 result.setBlueCoupons(blueCouponSummary.getList()); } if (shopCouponsFuture != null) { final ShopCouponSummary shopCouponSummary = buildShopCouponSummary(shopCouponsFuture.get()); result.setShopCouponsAmount(shopCouponSummary.getUsedAmount()); // 店铺券使用金额 result.setShopCouponsNum(shopCouponSummary.getUsedNum()); // 店铺券使用张数 result.setQueryShopCoupons(shopCouponSummary.getList()); } if (redCouponsFuture != null) { final RedCouponSummary redCouponSummary = buildRedCouponSummary(redCouponsFuture.get()); result.setRedCouponsAmount(redCouponSummary.getUsedAmount()); // 红券使用的金额 result.setRedCouponsNum(redCouponSummary.getUsedNum()); // 红券使用的张数 result.setRedCoupons(redCouponSummary.getList()); } if (prepaidCardFuture != null) { final PrepaidCardSummary prepaidCardSummary = buildPrepaidCardSummary(prepaidCardFuture.get()); result.setPrepaidCardsAmount(prepaidCardSummary.getUsedAmount()); // 预付卡使用的金额 result.setPrepaidCardsNum(prepaidCardSummary.getUsedNum()); // 预付卡使用的张数 result.setPrepaidCards(prepaidCardSummary.getList()); } VirtualAccountResult virtualAccountResult = null; if (virtualAccountResultFuture != null) { virtualAccountResult = virtualAccountResultFuture.get(); result.setVirtualAccountResult(virtualAccountResult); } if ((containsHaiWaiGou || !shouldQueryVirtualAccount(siteId, commerceItemList)) && virtualAccountResult != null) { virtualAccountResult.setPayAmount(DOUBLE_ZERO); // 可用金额为0 }
if (blueCouponsFuture != null) { final BlueCouponSummary blueCouponSummary = buildBlueCouponSummary(blueCouponsFuture.get()); result.setBlueCouponsAmount(blueCouponSummary.getUsedAmount()); // 蓝券的使用金额 result.setBlueCouponsNum(blueCouponSummary.getUsedNum()); // 蓝券的使用张数 result.setBlueCoupons(blueCouponSummary.getList()); } if (shopCouponsFuture != null) { final ShopCouponSummary shopCouponSummary = buildShopCouponSummary(shopCouponsFuture.get()); result.setShopCouponsAmount(shopCouponSummary.getUsedAmount()); // 店铺券使用金额 result.setShopCouponsNum(shopCouponSummary.getUsedNum()); // 店铺券使用张数 result.setQueryShopCoupons(shopCouponSummary.getList()); } if (redCouponsFuture != null) { final RedCouponSummary redCouponSummary = buildRedCouponSummary(redCouponsFuture.get()); result.setRedCouponsAmount(redCouponSummary.getUsedAmount()); // 红券使用的金额 result.setRedCouponsNum(redCouponSummary.getUsedNum()); // 红券使用的张数 result.setRedCoupons(redCouponSummary.getList()); } if (prepaidCardFuture != null) { final PrepaidCardSummary prepaidCardSummary = buildPrepaidCardSummary(prepaidCardFuture.get()); result.setPrepaidCardsAmount(prepaidCardSummary.getUsedAmount()); // 预付卡使用的金额 result.setPrepaidCardsNum(prepaidCardSummary.getUsedNum()); // 预付卡使用的张数 result.setPrepaidCards(prepaidCardSummary.getList()); } VirtualAccountResult virtualAccountResult = null; if (virtualAccountResultFuture != null) { virtualAccountResult = virtualAccountResultFuture.get(); result.setVirtualAccountResult(virtualAccountResult); } if ((containsHaiWaiGou || !shouldQueryVirtualAccount(siteId, commerceItemList)) && virtualAccountResult != null) { virtualAccountResult.setPayAmount(DOUBLE_ZERO); // 可用金额为0 } if (storePointFuture != null) { result.setStorePoint(storePointFuture.get()); } if (gomedoFuture != null) { result.setGomedo(gomedoFuture.get()); } if (keytFuture != null) { result.setKeyt(keytFuture.get()); }
package com.gome.pangu.trading.cart.business.taskflow.listcart.task; import java.util.List; import java.util.concurrent.Callable; import com.gome.framework.bleach.Bleacher; import com.gome.framework.logging.Logger; import com.gome.pangu.pricing.client.dto.result.PriceInfoResultRDTO; import com.gome.pangu.trading.bo.Cart; import com.gome.pangu.trading.bo.CommerceItem; import com.gome.pangu.trading.bo.ShippingGroup; import com.gome.pangu.trading.bo.User; import com.gome.pangu.trading.keyt.business.KeytBS; import com.gome.pangu.trading.keyt.client.dto.result.Keyt; /** * 命令立减查询服务Callable实现类 * Created by huangwenfeng on 2016/9/28. */ public class KeytGetter implements Callable<Keyt> { private static final Logger LOGGER = Bleacher.getLogger(KeytGetter.class); private final KeytBS keytBS; private final User user; private final String siteId; private final String keytStr; private final Cart currentCart; private final List<CommerceItem> commerceItemList; private final PriceInfoResultRDTO priceInfoResultRDTO; private final List<ShippingGroup> shippingGroups; public KeytGetter(KeytBS keytBS, User user, String siteId, String keytStr, Cart currentCart, List<CommerceItem> commerceItemList, PriceInfoResultRDTO priceInfoResultRDTO, List<ShippingGroup> shippingGroups) { this.keytBS = keytBS; this.user = user; this.siteId = siteId; this.keytStr = keytStr; this.currentCart = currentCart; this.commerceItemList = commerceItemList; this.priceInfoResultRDTO = priceInfoResultRDTO; this.shippingGroups = shippingGroups; } @Override public Keyt call() throws Exception { try { Keyt keyt = keytBS.queryKeyt(user, siteId, keytStr, currentCart, commerceItemList, priceInfoResultRDTO, shippingGroups); return keyt; } catch (Exception e) { LOGGER.error("调用命令立减查询服务失败,错误信息:", e); LOGGER.error("调用命令立减查询服务失败,用户ID:{},站点ID:{},口令串:{},Cart:{},商品列表:{},价格信息:{},配送单:{}", user.getUserId(), siteId, keytStr, currentCart, commerceItemList, priceInfoResultRDTO, shippingGroups); return null; } } }
/** * 口令立减 * Created by huangwenfeng on 2016/9/26. */ public class Keyt extends BaseModel { private static final long serialVersionUID = 4824030816608236547L; private String keytStr; // 命令串 private double remainingAmount; // 剩余支付金额(OMS备用) private double amount = 0; // 命令立减兑换到的金额 private int status = 0; // 向前端返回状态错误码 0-初始状态(没有用命令立减) 1-正常(可用命令立减) public double getAmount() { return amount; } public void setAmount(double amount) { this.amount = amount; } public int getStatus() { return status; } public void setStatus(int status) { this.status = status; } public String getKeytStr() { return keytStr; } public void setKeytStr(String keytStr) { this.keytStr = keytStr; } public double getRemainingAmount() { return remainingAmount; } public void setRemainingAmount(double remainingAmount) { this.remainingAmount = remainingAmount; } }
相关推荐
Java多线程机制研究.kdh Java多线程机制研究.kdh Java多线程机制研究.kdh
### JAVA多线程案例教学详析 #### 一、引言 随着计算机技术的发展,多核处理器已经成为...通过上述介绍的学习资料,新入行的开发者可以快速掌握Java多线程编程的核心概念和技术实践,为进一步深入研究打下坚实的基础。
基于Java多线程的并行计算技术研究及应用
【Java多线程迷宫生成程序】是一种利用Java编程语言实现的、基于多线程技术的动态迷宫生成示例。这个小程序展示了如何在计算机图形学中应用多线程来实时生成随机迷宫,同时也提供了视觉动画效果,使得观察迷宫生成的...
Java多线程编程是Java开发中的...每个章节的源码都是一个独立的案例,可以逐一研究,实践和调试,以便更好地掌握Java多线程编程技巧。在学习过程中,结合理论知识与实际操作,将有助于提升对Java并发编程的全面认识。
Java多线程同步机制研究分析 Java多线程同步机制是Java编程语言中的一种机制,它允许多个线程同时执行,提高了系统资源的利用率和安全性。但是,多线程中最重要的问题是线程的同步和共享资源的访问保护。本文通过对...
在Java编程语言中,多线程是实现并发执行任务的关键技术。这个“java多线程控制的赛跑程序”是一个示例,展示了如何利用多线程来模拟一场赛跑...通过深入研究和分析,你可以进一步提升自己在Java多线程编程方面的技能。
"基于Java多线程同步的安全性研究" 本文主要研究了基于Java多线程同步的安全性问题,讨论了Java多线程同步机制的实现方法和安全性问题的解决方法。文章首先介绍了Java多线程同步的必要性和重要性,然后讨论了Java多...
### 基于Java的多线程网络爬虫设计与实现 #### 概述 本文档探讨了在Java环境下设计与实现多线程网络爬虫的技术细节与实践方法。网络爬虫(Web Crawler),是一种自动抓取互联网上网页信息的程序或自动化脚本,其...
Java多线程编程教学研究 多线程编程是Java教学中的难点,也是影响学生网络编程能力的一个重点。本文尝试结合操作系统课程中的部分理论及教学工具,通过导入线程的概念、绘制状态转换图和设计同步算法等方式,对Java...
标题和描述均指向了一个关于Java多线程设计模式的PDF文档的下载链接,这暗示了文档的主要内容将围绕Java中的多线程编程及其设计模式展开。在Java领域,多线程是一个核心概念,它允许程序执行多个任务同时进行,极大...
这个“JAVA-多线程 所有文件”压缩包很可能包含了一系列关于Java多线程学习的源代码示例和相关文档。下面我们将深入探讨Java多线程的相关知识点。 1. **线程的概念**:线程是操作系统分配CPU时间的基本单位,一个...
Java多线程技术在爬虫应用中的重要性不言而喻,它能显著提升图片抓取的效率。本文将深入探讨如何使用Java实现多线程爬虫,以及压缩包中的三个示例案例。 首先,我们需要了解Java中的多线程概念。在Java中,通过创建...
Java多线程机制研究 Java多线程机制研究是Java编程语言的一项重要特点,允许在一个Java程序内部同时进行多种运算,从而充分利用系统资源,提高程序运行效率。本文将详细论述在Java程序中创建线程和实现线程体的机制...
在分析Java 多线程特性的基础上, 探讨了Java 多线程的测试策略及测试方法, 提出Java 多线程测试由类测试、集成模块测试和系统测试三个层次组成, 并讨论了多线程的继承测试、同步测试以及效率测试。
"Java多线程通信机制研究" Java多线程通信机制是Java程序设计中的一个复杂技术,涉及到多个线程之间的通信和协作。多线程是一种程序运行机制,它允许在程序中并发执行多个指令流,每个指令流都被称为一个线程,彼此...
《基于Java多线程的HTTP代理服务器的研究与实现》这篇文档深入探讨了如何利用Java语言构建一个高效的多线程HTTP代理服务器。在信息技术领域,HTTP代理服务器扮演着至关重要的角色,它作为客户端与目标服务器之间的...
Java多线程笔记 Java多线程笔记是 Java 编程语言中关于多线程编程的笔记,涵盖了线程基础知识、线程优先级、线程状态、守护线程、构造线程、线程中断等多方面的内容。 获取简单 main 程序中的线程 在 Java 中,...