多线程数据库操作
java并发编程-Executor框架
http://blog.csdn.net/shoubuliaolebu/article/details/7441811
Thread_跨节点会合查询
http://www.myexception.cn/program/1057745.html
mysql 索引
http://www.cnblogs.com/tianhuilove/archive/2011/09/05/2167795.html
c3p0
http://blog.sina.com.cn/s/blog_6c5f4d3c01012gtq.html
ForkJoinPool VS ExecutorService 实例分析
www.iteye.com/topic/1117483
差不多每1万条插入一次数据,性能最高,40秒左右。
http://hi.baidu.com/yiewgckawnlpuwe/item/80adbb9d5c263eb282d29571
MySQL五种优化插入数据表的查询方法 by cubeking
http://hi.baidu.com/cubeking/item/598abdc08b3b9225ef46651b
mysql批量提交的优化
http://hidba.org/?p=369
类 ThreadPoolExecutor
http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/ThreadPoolExecutor.html
ExecutorService+FutureTask实现程序执行超时监控
http://blog.csdn.net/navy0418/article/details/6612105
JAVA多线程的控制JAVA 5.0
http://blog.sina.com.cn/s/blog_5efa347301011zk3.html
http://v.youku.com/v_show/id_XNTA5ODk4NzQ0.html
C3p0使用时出现的异常及解决方案
http://blog.csdn.net/t12x3456/article/details/7650404
c3p0存在严重bug
http://weifly.iteye.com/blog/1227182
C3P0 bug 解决办法: 关闭的连接 still in use
http://bbs.csdn.net/topics/390290277
基于多线程的数据库连接池
http://blog.csdn.net/lihao8023/article/details/4237334
快速批量插入setLocalInfileInputStream的用法
http://blog.csdn.net/nsrainbow/article/details/8206050
MySQL性能优化的最佳21条经验
http://www.csdn.net/article/2011-08-09/302869
mysql快速插入/更新大量记录
http://www.cnblogs.com/chutianyao/archive/2012/07/18/2597330.html
数据库插入百万数据
http://blog.csdn.net/godfrey90/article/details/6534980
TYPE_SCROLL_INSENSITIVE:结果集的游标可以上下移动,当数据库变化时,当前结果集不变。
ResultSet.CONCUR_READ_ONLY:指定不可以更新ResultSet
以上两个参数是在对数据库进行分页处理时用到的。
分页经验:
分页表搞起来有些麻烦,DBA也提供了很多方法,找了个办法,比较符合我的业务要求,排序取
第一次: select userid from table order by userid asc limit 10000;
下一页:这里假设上一页的第1W 的userid为504010001,如下写
select userid from table where userid>504010001 order by userid asc limit 10000;
每次传入上次最大的一个userid,这样速度就很快了,数据多大都没问题
import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CompletionService; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.annotation.Resource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; /** * 多线长异步插入 */ @Component("cacheToMysqlTaskObj") public class CacheToMysqlTask{ Logger logger = LoggerFactory.getLogger(CacheToMysqlTask.class); @Resource private BarBatchDAO barBatchDAO; private static int initThreads = 10; private ExecutorService executorService; private CompletionService<Object> completionService; //每次执行的记录 private final static long exeStep = 10000; //任务未完成,休停时间 private final static long timeStep = 1*1000; //休停超时 private final static long stopStep = 1*60*60*1000; //递归超时 hours private final static int totalStep = 4; //1000W private final static int overRecords = 10000000; private final static int preFixLen = Constants.TAIR_FOOTBAR_VIEW.length(); public void exeToMysqlTask() { try{ logger.warn("import tair data to Mysql ..."); this.exeToMysqlTask(1, 10000, overRecords); logger.warn("over ..."); if(executorService != null && !executorService.isShutdown()){ executorService.shutdown(); logger.warn("executorService shutdown ..."); } }catch(Exception e){ logger.error("footbar mysql to tair failed:"+e); } } long totalTimes = 0L; public boolean exeToMysqlTask(long start,long end,long overRecords){ try{ long startTime = System.currentTimeMillis(); if(executorService == null || executorService.isShutdown()){ executorService = Executors.newFixedThreadPool(initThreads); completionService = new ExecutorCompletionService<Object>(executorService); } long kipStart = start; long kipEnd = end; long kipMax = overRecords; if(kipEnd<=0 || kipStart>kipEnd){ logger.error("执行的次数不对!"); return false; } if(kipEnd >= 8000000){ if(kipEnd%100000 == 0){ logger.warn("数据告警太大建议先拆分再操作!"); } } if(kipEnd > kipMax){ logger.warn("需处理数据 "+kipEnd+" 数据太大1000W以后的数据任务将不执行!"); return false; } for(int i=0;i<15;i++){ List<FootInfo> saveOrUpdateList = new ArrayList<FootInfo>(); for(;;){ //构造业务数据 saveOrUpdateList.add(footInfo); } if(i==10){ logger.error(i+"业务数据构造循环结束"); } if(saveOrUpdateList.size()>0){ BatchTask task = new BatchTask(batchDAO,saveOrUpdateList); if (!executorService.isShutdown()) { completionService.submit(task); } } } //保证最后一笔记录执行,避免无限循环 if(kipEnd > kipMax){ kipStart = kipEnd + 1; kipEnd = kipStart + exeStep; logger.warn("the task finished,after 2 sec over!"); break; } kipStart = kipEnd + 1; kipEnd = kipStart + exeStep; if(kipEnd>=kipMax){ kipEnd = kipMax; } if (!executorService.isShutdown()){ executorService.shutdown(); } boolean sleepKip = executorService.isTerminated(); long times = 0L; while(!sleepKip){ Thread.sleep(timeStep); sleepKip = executorService.isTerminated(); times += timeStep; if(times >= stopStep){ logger.error("单次服务所有task累计执行时间超过1个小时,请排查!" + times); executorService.shutdownNow(); break; } } //times timeout exit ... if(times >= stopStep){ logger.warn("times timeout exit ..."); return false; } long endTime = System.currentTimeMillis(); totalTimes += (endTime-startTime); //递归超时 long hours = (totalTimes % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60); if(hours >= totalStep){ logger.warn("totalTimes timeout exit: "+totalTimes+" milliseconds!"); //return false; } if(kipEnd < kipMax){ return exeToMysqlTask(kipStart,kipEnd,kipMax); } }catch(Exception e){ logger.error("foot bar tair to mysql failed:"+e); } return true; } } class BatchTask implements Callable<Object> { private List<FootBarInfo> list; private BatchDAO barBatchDAO; public BatchTask(){ } public BatchTask(BatchDAO barBatchDAO,List<FootInfo> list){ this.list = list; this.barBatchDAO = barBatchDAO; } public synchronized String call() throws Exception { if(list != null){ barBatchDAO.batchSaveOrUpdate(list); } return ""; } }
千万级插入更新,分页技巧
import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.annotation.Resource; import org.springframework.stereotype.Component; @Component("batchDAO") public class BatchDAO { @Resource private DataSource dataSource; static StringBuffer saveOrUpdateStr = new StringBuffer(); static StringBuffer nextPageStr = new StringBuffer(); static int commitInt = 2000; static{ saveOrUpdateStr.append("insert into foot_info (userid,siteinfo,create_time) values "); saveOrUpdateStr.append("(?,?,now())"); //组装更新语句 for(int i=1;i<commitInt;i++){ saveOrUpdateStr.append(",(?,?,now())"); } saveOrUpdateStr.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);"); nextPageStr.append(" select userid from foot_info "); nextPageStr.append(" where userid >= ? "); nextPageStr.append(" order by userid asc limit ? "); } /** * 将记录更新到daogou_footbar_info表,不存在插入,存在更新siteinfo * 单笔提交5000条 * @param footBarList * @throws SQLException */ public void batchSaveOrUpdate(List<FootInfo> footList) throws SQLException{ Connection conn = null; PreparedStatement ps = null; try { conn = dataSource.getConnection(); conn.setAutoCommit(false); ps = conn.prepareStatement(saveOrUpdateStr.toString()); List<List<FootInfo>> splitList = splitList(footList,commitInt); for(int i=0;i<splitList.size();i++){ List<FootInfo> lists = splitList.get(i); //不够整次处理 if(lists.size()<commitInt){ StringBuffer updateTmp = new StringBuffer(); updateTmp.append(" insert into foot_info (userid,siteinfo,create_time) values "); updateTmp.append("(?,?,now())"); //组装更新语句 for(int n=1;n<lists.size();n++){ updateTmp.append(",(?,?,now())"); } updateTmp.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);"); ps = conn.prepareStatement(updateTmp.toString()); } for(int j=0;j<lists.size();j++){ FootInfo foot = (FootInfo)lists.get(j); ps.setLong(2*j+1, foot.getUserId()); ps.setString(2*j+2, foot.getSiteInfo()); } ps.execute(); conn.commit(); } } catch (SQLException e) { throw new SQLException(e); } finally{ if(ps != null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } /** * 对1000W表设计的分页 * 对userid排序,从最小记录开始查询每次limit 1W条 * 每次查询userid>上次查询的最大userid,这样解决了数据过大的分页问题,这里只做下一页的分页 * @param start * @param step * @return * @throws SQLException */ public List<Long> getNextPage(Long start,Long step) throws SQLException{ Connection conn = null; PreparedStatement ps = null; ResultSet rs = null; try { conn = dataSource.getConnection(); ps = conn.prepareStatement(nextPageStr.toString()); ps.setLong(1, start); ps.setLong(2, step); rs = ps.executeQuery(); List<Long> userIds = new ArrayList<Long>(); while(rs.next()){ userIds.add(rs.getLong("userid")); } return userIds; } catch (SQLException e) { throw new SQLException(e); } finally{ if(rs != null){ try { rs.close(); } catch (SQLException e) { e.printStackTrace(); } } if(ps != null){ try { ps.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conn != null){ try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } } public static <T> List<List<T>> splitList(List<T> list, int pageSize) { List<List<T>> listArray = new ArrayList<List<T>>(); if(list != null && list.size()>0){ int listSize = list.size(); int page = (listSize + (pageSize-1))/ pageSize; for(int i=0;i<page;i++) { List<T> subList = new ArrayList<T>(); for(int j=0;j<listSize;j++) { int pageIndex = ( (j + 1) + (pageSize-1) ) / pageSize; if(pageIndex == (i + 1)) { subList.add(list.get(j)); } if( (j + 1) == ((j + 1) * pageSize) ) { break; } } listArray.add(subList); } } return listArray; } }
MysqlSequence
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.annotation.Resource; import org.springframework.stereotype.Service; import com.google.common.base.Preconditions; import com.google.common.base.Strings; @Service("sequenceImpl") public class MysqlSequence implements Sequence { private int rangeStep = 1000; private volatile AtomicLong sequence = new AtomicLong(-1); private final Lock lock = new ReentrantLock(); private String sequenceName=""; private volatile long seqEndPoint ; @Resource private SequenceDao sequenceDao ; private int defaultTryTime = 100; @Override public long nextValue(String sequenceName) throws Exception { int tryTime = defaultTryTime; for (;;) { if (tryTime < 0) { throw new IllegalStateException("try too many times,maybe the database crashed!"); } tryTime--; if (sequence.get() == -1L) { lock.lock(); try { if(sequence.get() == -1L) { Long expectValue = sequenceDao.getSequnceNumber(sequenceName); boolean ret = sequenceDao.getAndIncrement(sequenceName, rangeStep,expectValue); if(!ret) continue; sequence.set(expectValue); seqEndPoint = sequence.get() + rangeStep; } } finally { lock.unlock(); } } lock.lock(); try { if(sequence.get() >= seqEndPoint) { sequence.set(-1L); continue; } return sequence.incrementAndGet(); } finally { lock.unlock(); } } } public void setRangeStep(int rangeStep) { Preconditions.checkArgument(rangeStep>=100,"rangeStep must large than 100!"); this.rangeStep = rangeStep; } public String getSequnceName() { return sequenceName; } public void setSequnceName(String sequenceName) { Preconditions.checkArgument(!Strings.isNullOrEmpty(sequenceName),"sequnceName is not allow null!"); this.sequenceName = sequenceName; } public void setDefaultTryTime(int defaultTryTime) { this.defaultTryTime = defaultTryTime; } }
相关推荐
在实现多线程数据库操作时,需要注意以下关键点: - **事务管理**:多线程可能会引发并发控制问题,比如脏读、不可重复读和幻读。使用数据库提供的事务机制(如ACID属性)和隔离级别可以避免这些问题。 - **数据库...
7. **性能优化**:为了进一步提高多线程数据库操作的性能,可以考虑使用批处理(Batch Processing),即在一次数据库调用中发送多条SQL语句。此外,合理调整数据库和应用服务器的配置参数,如连接池大小、线程数量等...
本文将深入探讨如何在Delphi中实现多线程数据库访问,并结合给定的文件名列表来推测可能的项目结构。 首先,多线程的基本概念是并发执行多个任务,每个任务在一个独立的线程上运行。在Delphi中,我们可以使用...
然后,我们将数据库操作封装为`Callable`任务,因为`Callable`可以返回结果,适合于数据库操作这种有返回值的操作。 ```java public class DatabaseTask implements Callable<Void> { private Connection ...
在IT领域,数据库是存储和管理数据的核心工具,而SQLite是一个轻量级的、开源的、自包含的数据库系统,广泛应用于移动设备和...通过合理的设计和优化,我们可以充分利用多线程的优势,实现高效、稳定的数据库操作。
在IT领域,多线程数据库应用程序编程是一项关键的技术,尤其在使用Delphi这种高效、强大的RAD(快速应用开发)工具时。Delphi以其高效的VCL框架和原生的编译器支持,使得开发者能够轻松地构建多线程的数据库应用,...
本项目涉及的关键知识点是“多线程对数据库操作”,主要使用Java语言实现,并与MySQL数据库进行交互。以下是对这些核心概念的详细阐述。 **1. 多线程** 多线程是指在一个进程中同时执行多个不同的线程。Java提供了...
多线程 数据库 插入 实例 自己写的还不错哦
* 支持多线程,保证获取到的连接一定是没有被其他线程正在使用 * 按需创建连接,可以创建多个连接,可以控制连接的数量 * 连接被复用,不是每次都重新创建一个新的连接(连接的创建是一个很消耗资源的过程) * ...
在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用...通过理解和掌握这些知识点,我们可以有效地提高数据库操作的效率和系统的响应速度。
在IT领域,多线程、定时并发以及数据库操作是核心概念,特别是在开发高效、稳定的应用系统时。日志类则是追踪程序运行状态、排查问题的关键工具。以下将详细阐述这些知识点。 1. **多线程**:多线程是指在一个程序...
本主题将深入探讨"易语言多线程数据库查询对比"的相关知识点。 首先,我们要理解易语言中的多线程概念。在易语言中,创建线程可以通过调用特定的系统API函数或者使用内置的线程支持模块来实现。多线程允许程序在...
在这个"操作多线程删除数据库表,以及控制listbox多线程呈现"的小程序中,我们将探讨如何利用多线程来处理数据库操作和UI更新。 1. **多线程基础**:多线程是指在一个应用程序中同时运行多个独立的执行流。在.NET ...
在这个示例中,我们将探讨如何在Android中进行多线程数据库操作以及如何利用CursorAdapter实现异步加载数据库数据。 首先,我们来看多线程访问SQLite数据库。在Android中,主线程负责UI的更新和交互,而长时间运行...
通过分析和学习这个代码,你可以理解如何在实际项目中实现多线程数据库访问,以及如何解决相关的问题。 总之,多线程访问数据库是一项重要的技能,尤其在大型系统和高并发应用中。通过VC6.0的实践,你可以更好地...
事务的ACID属性(原子性、一致性、隔离性和持久性)是保证数据库操作正确性的基础。 4. **性能调优**:根据数据库系统的特性调整线程数量,过多的线程可能会导致上下文切换开销增大,反而降低性能。适当的线程池...
在开发基于Qt的应用程序时,经常会遇到需要使用多线程来进行SQLite数据库操作的情况。然而,多线程环境下的数据库操作相较于单线程来说更为复杂,需要特别注意一些细节问题,以确保程序的稳定性和效率。本文将总结在...
《JasonQt_Database Demo:多线程数据库操作详解》 在现代软件开发中,多线程已经成为提升程序性能和响应速度的重要手段。特别是在处理数据库操作时,为了保证系统的并发性和高效性,多线程技术的应用尤为关键。...