(一)
new Thread(new SolrDataHandleThread()).start();
new Thread(new ElasticSearchDataHandler()).start();
new Thread(new RedisDataHandler()).start();
private class SolrDataHandleThread implements Runnable {
public void run() {
log.info("in SolrDataHandleThread run()--->begin");
log.info("pageSize is--->" + pageSize);
//调saf接口 根据商家id查询商家管理系统oracle数据 如商家的所属运营人员及部门的数据
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
int totalCount = resultDataAllList.size();
int totalPages = totalCount % pageSize > 0 ? (totalCount / pageSize + 1) : totalCount / pageSize;
List<VenderAuthResultData> resultDataTempList = new ArrayList<VenderAuthResultData>();
for (int page = 1; page <= totalPages; page++) {
int startIndex = (page - 1) * pageSize;
int endIndex = startIndex + pageSize;
if (totalCount > endIndex) {
resultDataTempList = resultDataAllList.subList(startIndex, endIndex);
} else {
resultDataTempList = resultDataAllList.subList(startIndex, totalCount);
}
Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>();
if (resultDataTempList != null && resultDataTempList.size() > 0) {
for (VenderAuthResultData vdl : resultDataTempList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
//构建刷solr数据
SolrInputDocument doc = buildSolrInputDocument(vdl, venderInfoDTO);
docs.add(doc);
}
}
}
try {
solrServer92.add(docs);
solrServer92.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到92solr服务器-刷新全部solr数据遇到错误", e);
}
try {
solrServer96.add(docs);
solrServer96.commit();
} catch (Exception e) {
log.error("批量同步所有mysql数据到96solr服务器-刷新全部solr数据遇到错误", e);
}
log.info("deal with data end, page number is--->" + page);
}
}
log.info("in SolrDataHandleThread run()--->end");
}
}
private class ElasticSearchDataHandler implements Runnable {
public void run() {
log.info("Elastic search thread begin to process data !");
boolean switchEs = ConfigCenterUtil.getSwitchConfig(SellerAuthStrategy.ES_SWITCH);
log.info("The switch of writing to ElasticSearch is opened :" + switchEs);
if(switchEs) {
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getAllVenderAuthResultData();
if (resultDataAllList != null && resultDataAllList.size() > 0) {
log.info("Size of data is " + resultDataAllList.size());
for (VenderAuthResultData vdl : resultDataAllList) {
VenderInfoDTO venderInfoDTO = penaltyBlackWhiteClient.getVenderInfoDtoByVenderId(vdl.getVenderId());
if (venderInfoDTO != null) {
SellerAuthDocument sellerAuthDocument = buildSellerAuthDocument(vdl, venderInfoDTO);
indexElasticSearchDoc(sellerAuthDocument);
}
}
}
}
log.info("Elastic search thread process data successfully !");
}
}
private class RedisDataHandler implements Runnable {
public void run() {
log.info("begin to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
Calendar calendar = getLegalCalendar();
String[] monthArray = getMonths(calendar);
for(String m : monthArray) {
log.info("Begin to process sync data to redis server.with month : " + m);
List<VenderAuthResultData> resultDataAllList = venderAuthResultDataDao.getCurrentYearMonthVenderAuthResultDataByOpTime(m);
if(null != resultDataAllList && resultDataAllList.size() > 0) {
syncRedisDataProcess(resultDataAllList);
}else{
log.info("The synchronous data is empty. the month is " + m);
}
log.info("Finish to process sync data to redis server.with month : " + m);
}
log.info("finish to synchronize the redis data in thread : RedisDataHandler, with page size: " + pageSize);
}
}
===============================================================================
(二)
注入线程池:
private ThreadPoolUtil threadPoolUtil;
public void setThreadPoolUtil(ThreadPoolUtil threadPoolUtil) {
this.threadPoolUtil = threadPoolUtil;
}
bean配置:
<!-- 线程池util -->
<bean id="threadPoolUtil" class="com.jd.util.ThreadPoolUtil"/>
调用:
private void writeCustomsOpenStatusToRedis(Long customsId, int openStatus) {
try {
final int openStatusTmp = openStatus;
final Long customsIdTmp = customsId;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if(customsIdTmp == null) {
return;
}
if (openStatusTmp == 1) {
redisUtils.set(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp, String.valueOf(openStatusTmp));
} else {
redisUtils.del(RedisConstants.CUSTOMS_OPEN_STATUS_SURFIX + customsIdTmp);
}
}
});
}catch (Exception e) {
log.error("构建海关启用停用redis数据失败", e);
}
}
private void writeCustomsVenderToRedis(Long customsId, Long venderId, int customsVenderStatus) {
try {
final Long venderIdTemp = venderId;
final Long customsIdTmp = customsId;
final int customsVenderStatusTmp = customsVenderStatus;
threadPoolUtil.getCachedThreadPool().execute(new Runnable() {
public void run() {
if (customsVenderStatusTmp == 1) {
redisUtils.hset(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp), "1");
} else {
redisUtils.hdel(RedisConstants.CUSTOMS_VENDER_ID_SURFIX + venderIdTemp, String.valueOf(customsIdTmp));
}
}
});
}catch (Exception e) {
log.error("构建海关店铺添加或移除redis数据失败", e);
}
}
线程池类ThreadPoolUtil.java:
package com.jd.util;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 创建获取线程池的util
* User: shaodong
* Date: 13-1-5
* Time: 下午12:46
* To change this template use File | Settings | File Templates.
*/
public class ThreadPoolUtil {
/**
* 创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行很多短期异步任务的程序而言,
* 这些线程池通常可提高程序性能。调用 execute 将重用以前构造的线程(如果线程可用)。
# 如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
*/
private final ExecutorService CACHED_THREAD_POOL = Executors.newCachedThreadPool();
/**
* 创建一个固定大小的线程池,最大50个,超过50个的时候,会阻塞等待
*/
private final ExecutorService FIXED_THREAD_POOL = Executors.newFixedThreadPool(50);
/**
* 获得固定大小的线程池
* @return
*/
public ExecutorService getCachedThreadPool() {
return CACHED_THREAD_POOL;
}
// public static void main(String args[]){
// new ThreadPoolUtil().m();
// }
//
// void m() {
// //创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。
// ExecutorService pool = Executors.newFixedThreadPool(50);
// //创建实现了runnable接口的对象
// for(int i=1; i<=100; i++) {
// pool.execute(new MyThread(i));
// }
// pool.shutdown();
// }
//
// class MyThread extends Thread{
// int i = 0;
// public MyThread(int _i){
// System.out.println("create thread:"+_i);
// i = _i;
// }
//
// @Override
// public void run(){
// try {
// Thread.sleep(2000);
// } catch (InterruptedException e) {
// e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
// }
// System.out.println(Thread.currentThread().getName()+" is running... thread:"+i);
// }
// }
}
相关推荐
java线程池使用后到底要关闭吗 java线程池是一种高效的并发编程技术,可以帮助开发者更好地管理线程资源,提高系统的性能和可靠性。然而,在使用java线程池时,一个常见的问题是:使用完线程池后到底要不要关闭?...
在深入理解高并发编程,尤其是Java线程池核心技术时,我们首先要明白线程与多线程的概念。线程是操作系统中的基本调度单元,它比进程更小,且基本不拥有系统资源,主要由程序计数器、寄存器和栈等组成。在同一个进程...
Java 线程池是 Java 语言中的一个重要概念,它允许开发者创建和管理多个线程,以提高程序的并发性和性能。下面是对给定文件的解析,包括 title、description、标签和部分内容的解析。 标题解析 标题 "Java 线程池...
Java 多线程-Socket 编程是指在 Java 语言中使用多线程技术来实现网络编程,特别是使用 Socket 编程来实现客户端和服务器端的通信。在 Java 中,多线程可以使用 Thread 类和 Runnable 接口来实现,而 Socket 编程则...
线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池可以有效地控制运行的线程数量,当线程数量过多时,可以适当控制新的线程的创建,避免过多线程导致系统资源的...
Java中的线程池是多线程编程中一种高效、可管理的执行机制。它通过预先创建并维护一组线程,避免了频繁地创建和销毁线程带来的开销,从而提高了程序的性能和响应速度。线程池的核心概念包括以下几个方面: 1. **...
线程池是一种多线程处理形式,预先创建了若干个线程,当有任务需要执行时,会从线程池中取出一个线程来执行任务,任务执行完毕后,线程返回线程池中等待新的任务。这种机制避免了频繁创建和销毁线程带来的性能开销,...
### Java多线程与并发(17-26)-JUC线程池-FutureTask详解 #### 一、概述 本文将围绕Java多线程与并发中的重要概念——`FutureTask`进行深入探讨。`FutureTask`是Java并发库中的一个关键组件,它实现了`...
通过合理使用Java线程池,开发者可以更加高效地管理线程资源,提高应用程序的性能和稳定性。线程池的使用也应当注意避免资源竞争、线程死锁以及可能的内存泄漏等问题,确保线程安全和高效的并行处理能力。
在Java中,使用`ExecutorService`和`ThreadPoolExecutor`来创建线程池,而不是直接使用`Executors`,因为`Executors`创建的线程池可能会导致资源耗尽的问题。 集合是Java中存储数据的主要工具,包括List、Set和Map...
Java线程池是一种高级的多线程处理框架,它是Java并发编程中非常重要的一个组件。线程池的原理和实现涉及到操作系统调度、内存管理和并发控制等多个方面。理解线程池的工作原理有助于优化程序性能,避免过度创建和...
计算机后端-Java-Java核心基础-第20章 多线程 19. 使用线程池的好处.avi
在Java编程中,多线程是并发编程的重要组成部分,它允许程序同时执行多个任务,从而提高了系统的效率和响应性。然而,在某些场景下,我们可能需要控制线程的执行顺序,确保它们按照特定的顺序交替运行,这在并发编程...
线程池管理和多线程上传是并发编程中的一个重要实践,特别是在大数据传输和网络服务中。在Java等编程语言中,线程池通过有效地管理和复用线程资源,避免了频繁创建和销毁线程带来的开销,提升了系统性能。下面将详细...
目标:Java中多线程技术是一个难点,但是也是一个核心技术。因为Java本身就是一个多线程语言。本人目前在给46班讲授Swing的网络编程--使用Swing来模拟真实的QQ实时聊天软件。因为涉及到Socket编程,所以一定会使用多...
本文将深入探讨如何在Winform应用中使用异步多线程和线程池。 一、线程基础 线程是操作系统分配CPU时间的基本单元,每个进程至少包含一个线程。在C#中,可以使用`System.Threading.Thread`类来创建和管理线程。通过...
在本项目实践中,我们将深入探讨Java中的多线程与高并发技术,特别是在人工智能领域的应用。Java作为一种广泛应用的编程语言,其强大的并发处理能力是其在大规模数据处理和高性能计算场景中备受青睐的原因之一。本...
总结来说,自定义Java线程池的实现涉及到线程的管理和调度,包括核心线程的维护、任务队列的使用以及拒绝策略的设定。理解这些机制有助于我们更好地优化并发程序,提高系统效率。然而,在实际开发中,通常推荐使用...
Java线程池是Java并发编程中的重要组成部分,它...总之,通过Java线程池和观察者模式的结合,我们可以构建一个健壮的多线程系统,即使在部分线程意外终止的情况下,也能及时发现并采取措施恢复,确保系统的稳定运行。
计算机后端-Java-Java核心基础-第20章 多线程 20. 创建多线程的方式四:使用线程池.avi