`
zhoushu126
  • 浏览: 81663 次
  • 性别: Icon_minigender_1
  • 来自: 长沙
社区版块
存档分类
最新评论

多线程数据库操作

 
阅读更多

java并发编程-Executor框架
http://blog.csdn.net/shoubuliaolebu/article/details/7441811

Thread_跨节点会合查询
http://www.myexception.cn/program/1057745.html
mysql 索引
http://www.cnblogs.com/tianhuilove/archive/2011/09/05/2167795.html
c3p0
http://blog.sina.com.cn/s/blog_6c5f4d3c01012gtq.html
ForkJoinPool VS ExecutorService 实例分析
www.iteye.com/topic/1117483


差不多每1万条插入一次数据,性能最高,40秒左右。
http://hi.baidu.com/yiewgckawnlpuwe/item/80adbb9d5c263eb282d29571
MySQL五种优化插入数据表的查询方法 by cubeking
http://hi.baidu.com/cubeking/item/598abdc08b3b9225ef46651b
mysql批量提交的优化
http://hidba.org/?p=369
类 ThreadPoolExecutor
http://www.cjsdn.net/Doc/JDK50/java/util/concurrent/ThreadPoolExecutor.html
ExecutorService+FutureTask实现程序执行超时监控
http://blog.csdn.net/navy0418/article/details/6612105

JAVA多线程的控制JAVA 5.0
http://blog.sina.com.cn/s/blog_5efa347301011zk3.html

http://v.youku.com/v_show/id_XNTA5ODk4NzQ0.html
C3p0使用时出现的异常及解决方案
http://blog.csdn.net/t12x3456/article/details/7650404
c3p0存在严重bug
http://weifly.iteye.com/blog/1227182
C3P0 bug 解决办法: 关闭的连接 still in use
http://bbs.csdn.net/topics/390290277
基于多线程的数据库连接池
http://blog.csdn.net/lihao8023/article/details/4237334
快速批量插入setLocalInfileInputStream的用法
http://blog.csdn.net/nsrainbow/article/details/8206050
MySQL性能优化的最佳21条经验
http://www.csdn.net/article/2011-08-09/302869
mysql快速插入/更新大量记录
http://www.cnblogs.com/chutianyao/archive/2012/07/18/2597330.html
数据库插入百万数据
http://blog.csdn.net/godfrey90/article/details/6534980

TYPE_SCROLL_INSENSITIVE:结果集的游标可以上下移动,当数据库变化时,当前结果集不变。
ResultSet.CONCUR_READ_ONLY:指定不可以更新ResultSet
以上两个参数是在对数据库进行分页处理时用到的。


分页经验:
分页表搞起来有些麻烦,DBA也提供了很多方法,找了个办法,比较符合我的业务要求,排序取
第一次: select userid from table order by userid asc limit 10000;
下一页:这里假设上一页的第1W 的userid为504010001,如下写
select userid from table where userid>504010001 order by userid asc limit 10000;
每次传入上次最大的一个userid,这样速度就很快了,数据多大都没问题

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
 * 多线长异步插入
 */
@Component("cacheToMysqlTaskObj")
public class CacheToMysqlTask{
	Logger logger = LoggerFactory.getLogger(CacheToMysqlTask.class);
	
	@Resource
    private BarBatchDAO barBatchDAO;

	private static int initThreads = 10;
	private ExecutorService executorService;
	private CompletionService<Object> completionService;
	//每次执行的记录
	private final static long exeStep = 10000;
	//任务未完成,休停时间
	private final static long timeStep = 1*1000;
	//休停超时
	private final static long stopStep = 1*60*60*1000;
	//递归超时 hours
	private final static int totalStep = 4;
	//1000W
	private final static int overRecords = 10000000;
	
	private final static int preFixLen = Constants.TAIR_FOOTBAR_VIEW.length();
	
	public void exeToMysqlTask() {
		try{
			logger.warn("import tair data to Mysql ...");
			this.exeToMysqlTask(1, 10000, overRecords);
			logger.warn("over ...");
			
			if(executorService != null && !executorService.isShutdown()){
				executorService.shutdown();
				logger.warn("executorService shutdown ...");
			}
		}catch(Exception e){
			logger.error("footbar mysql to tair failed:"+e);
		}
	}
	
	long totalTimes = 0L;
	public boolean exeToMysqlTask(long start,long end,long overRecords){
		try{
			long startTime = System.currentTimeMillis();
			
			if(executorService == null || executorService.isShutdown()){
				executorService = Executors.newFixedThreadPool(initThreads);
				completionService = new ExecutorCompletionService<Object>(executorService);
			}

			long kipStart = start;
			long kipEnd = end;
			long kipMax = overRecords;
			
			if(kipEnd<=0 || kipStart>kipEnd){
				logger.error("执行的次数不对!");
				return false;
			}
			
			if(kipEnd >= 8000000){
				if(kipEnd%100000 == 0){
					logger.warn("数据告警太大建议先拆分再操作!");
			    }
			}
			
			if(kipEnd > kipMax){
				logger.warn("需处理数据 "+kipEnd+" 数据太大1000W以后的数据任务将不执行!");
				return false;
			}
			
			for(int i=0;i<15;i++){
				List<FootInfo> saveOrUpdateList = new ArrayList<FootInfo>();
				for(;;){
					//构造业务数据
					saveOrUpdateList.add(footInfo);
				}
				if(i==10){
					logger.error(i+"业务数据构造循环结束");
				}
				if(saveOrUpdateList.size()>0){
					BatchTask task = new BatchTask(batchDAO,saveOrUpdateList);
					if (!executorService.isShutdown()) {
						completionService.submit(task);
					}
				}
			}

			//保证最后一笔记录执行,避免无限循环
			if(kipEnd > kipMax){
				kipStart = kipEnd + 1;
				kipEnd = kipStart + exeStep;
				logger.warn("the task finished,after 2 sec over!");
				break;
			}
			
			kipStart = kipEnd + 1;
			kipEnd = kipStart + exeStep;
			
			if(kipEnd>=kipMax){
				kipEnd = kipMax;
			}

			if (!executorService.isShutdown()){
				executorService.shutdown();
			}
			
			boolean sleepKip = executorService.isTerminated();
			long times = 0L;
			while(!sleepKip){
				Thread.sleep(timeStep);
				sleepKip = executorService.isTerminated();
				times += timeStep;
				if(times >= stopStep){
					logger.error("单次服务所有task累计执行时间超过1个小时,请排查!" + times);
					executorService.shutdownNow();
					break;
				}
			}
			
			//times timeout exit ...
			if(times >= stopStep){
				logger.warn("times timeout exit ...");
				return false;
			}
			
			long endTime = System.currentTimeMillis();
			
			totalTimes += (endTime-startTime);
			//递归超时
			long hours = (totalTimes % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60);  
			if(hours >= totalStep){
				logger.warn("totalTimes timeout exit: "+totalTimes+" milliseconds!");
				//return false;
			}
			
			if(kipEnd < kipMax){
				return exeToMysqlTask(kipStart,kipEnd,kipMax);
			}
		}catch(Exception e){
			logger.error("foot bar tair to mysql failed:"+e);
		}
		return true;
	}
}

class BatchTask implements Callable<Object> {
	private List<FootBarInfo> list;
    private BatchDAO barBatchDAO;

	public BatchTask(){
		
	}
	
	public BatchTask(BatchDAO barBatchDAO,List<FootInfo> list){
		this.list = list;
		this.barBatchDAO = barBatchDAO;
	}
	
	public synchronized String call() throws Exception {
		if(list != null){
			barBatchDAO.batchSaveOrUpdate(list);
		}
		return "";
	}
}


千万级插入更新,分页技巧
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;
import org.springframework.stereotype.Component;

@Component("batchDAO") 
public class BatchDAO {
	@Resource
    private DataSource dataSource;

	static StringBuffer saveOrUpdateStr = new StringBuffer();
	static StringBuffer nextPageStr = new StringBuffer();
	
	static int commitInt = 2000;
	static{
		saveOrUpdateStr.append("insert into foot_info (userid,siteinfo,create_time) values ");
		saveOrUpdateStr.append("(?,?,now())");
		//组装更新语句
		for(int i=1;i<commitInt;i++){
			saveOrUpdateStr.append(",(?,?,now())");
		}
		saveOrUpdateStr.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);");

		nextPageStr.append(" select userid from foot_info ");
	    nextPageStr.append(" where userid >= ? ");
		nextPageStr.append(" order by userid asc limit ? ");
	}

	/**
	 * 将记录更新到daogou_footbar_info表,不存在插入,存在更新siteinfo
	 * 单笔提交5000条
	 * @param footBarList
	 * @throws SQLException
	 */
	public void batchSaveOrUpdate(List<FootInfo> footList) throws SQLException{
		Connection conn = null;
		PreparedStatement ps = null;
		try {
			conn = dataSource.getConnection();
			conn.setAutoCommit(false);
			ps = conn.prepareStatement(saveOrUpdateStr.toString());
			
			List<List<FootInfo>> splitList = splitList(footList,commitInt);
			for(int i=0;i<splitList.size();i++){
				List<FootInfo> lists = splitList.get(i);
				//不够整次处理
				if(lists.size()<commitInt){
					StringBuffer updateTmp = new StringBuffer();
					updateTmp.append(" insert into foot_info (userid,siteinfo,create_time) values ");
					updateTmp.append("(?,?,now())");
					//组装更新语句
					for(int n=1;n<lists.size();n++){
						updateTmp.append(",(?,?,now())");
					}
					updateTmp.append(" on duplicate key update siteinfo=values(siteinfo),create_time=values(create_time);");
					ps = conn.prepareStatement(updateTmp.toString());
				}
				
				for(int j=0;j<lists.size();j++){
					FootInfo foot = (FootInfo)lists.get(j);
					ps.setLong(2*j+1, foot.getUserId());
					ps.setString(2*j+2, foot.getSiteInfo());
				}
				
				ps.execute();
				conn.commit();
			}
		} catch (SQLException e) {
			throw new SQLException(e);
		} finally{
			if(ps != null){
				try {
					ps.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			
			if(conn != null){
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 对1000W表设计的分页
	 * 对userid排序,从最小记录开始查询每次limit 1W条
	 * 每次查询userid>上次查询的最大userid,这样解决了数据过大的分页问题,这里只做下一页的分页
	 * @param start
	 * @param step
	 * @return
	 * @throws SQLException
	 */
	public List<Long> getNextPage(Long start,Long step)  throws SQLException{
		Connection conn = null;
		PreparedStatement ps = null;
		ResultSet rs = null;
		try {
			conn = dataSource.getConnection();
			ps = conn.prepareStatement(nextPageStr.toString());
			ps.setLong(1, start);
			ps.setLong(2, step);
			rs = ps.executeQuery();
			
			List<Long> userIds = new ArrayList<Long>();
			while(rs.next()){
				userIds.add(rs.getLong("userid"));
			}
			return userIds;
		} catch (SQLException e) {
			throw new SQLException(e);
		} finally{
			if(rs != null){
				try {
					rs.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			
			if(ps != null){
				try {
					ps.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
			
			if(conn != null){
				try {
					conn.close();
				} catch (SQLException e) {
					e.printStackTrace();
				}
			}
		}
	}

	public static <T> List<List<T>> splitList(List<T> list, int pageSize) {
		List<List<T>> listArray = new ArrayList<List<T>>();
        if(list != null && list.size()>0){
	        int listSize = list.size();
	        int page = (listSize + (pageSize-1))/ pageSize;
	        for(int i=0;i<page;i++) {
	            List<T> subList = new ArrayList<T>();
	            for(int j=0;j<listSize;j++) {
	                int pageIndex = ( (j + 1) + (pageSize-1) ) / pageSize;
	                if(pageIndex == (i + 1)) {
	                    subList.add(list.get(j));
	                }
	                
	                if( (j + 1) == ((j + 1) * pageSize) ) {
	                    break;
	                }
	            }
	            listArray.add(subList);
	        }
        }
        return listArray;
    }
}


MysqlSequence
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Resource;
import org.springframework.stereotype.Service;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;


@Service("sequenceImpl")
public class MysqlSequence implements Sequence {
	
	private int rangeStep = 1000;
	
	private volatile AtomicLong sequence = new AtomicLong(-1); 
	
	private final Lock lock = new ReentrantLock();
	
	private String sequenceName="";
	
	private volatile long seqEndPoint ;
	
	@Resource
	private SequenceDao sequenceDao ;

	private int defaultTryTime = 100;
	
	@Override
	public long nextValue(String sequenceName) throws Exception {
		int tryTime = defaultTryTime;
		for (;;) {
			
			if (tryTime < 0) {
				throw new IllegalStateException("try too many times,maybe the database crashed!");
			}
			tryTime--;
			
			if (sequence.get() == -1L) {
				lock.lock();
				try {
					if(sequence.get() == -1L) {
						Long expectValue = sequenceDao.getSequnceNumber(sequenceName);
						boolean ret = sequenceDao.getAndIncrement(sequenceName, rangeStep,expectValue);
						if(!ret) continue;
						sequence.set(expectValue);	
						seqEndPoint = sequence.get() + rangeStep;
					}
				} finally {
					lock.unlock();
				}
				
			} 
			
			lock.lock();
			try {
				if(sequence.get() >= seqEndPoint) {
					sequence.set(-1L);
					continue;
				}
				return sequence.incrementAndGet();	
			} finally {
				lock.unlock();
			
			}
		}
		
	}


	public void setRangeStep(int rangeStep) {
		Preconditions.checkArgument(rangeStep>=100,"rangeStep must large than 100!");
		this.rangeStep = rangeStep;
	}

	public String getSequnceName() {
		return sequenceName;
	}

	public void setSequnceName(String sequenceName) {
		Preconditions.checkArgument(!Strings.isNullOrEmpty(sequenceName),"sequnceName is not allow null!");
		this.sequenceName = sequenceName;
	}

	public void setDefaultTryTime(int defaultTryTime) {
		this.defaultTryTime = defaultTryTime;
	}
}

分享到:
评论

相关推荐

    java 多线程数据库操作

    在实现多线程数据库操作时,需要注意以下关键点: - **事务管理**:多线程可能会引发并发控制问题,比如脏读、不可重复读和幻读。使用数据库提供的事务机制(如ACID属性)和隔离级别可以避免这些问题。 - **数据库...

    java 多线程操作数据库

    7. **性能优化**:为了进一步提高多线程数据库操作的性能,可以考虑使用批处理(Batch Processing),即在一次数据库调用中发送多条SQL语句。此外,合理调整数据库和应用服务器的配置参数,如连接池大小、线程数量等...

    delphi多线程访问数据库

    本文将深入探讨如何在Delphi中实现多线程数据库访问,并结合给定的文件名列表来推测可能的项目结构。 首先,多线程的基本概念是并发执行多个任务,每个任务在一个独立的线程上运行。在Delphi中,我们可以使用...

    java多线程处理数据库数据

    然后,我们将数据库操作封装为`Callable`任务,因为`Callable`可以返回结果,适合于数据库操作这种有返回值的操作。 ```java public class DatabaseTask implements Callable&lt;Void&gt; { private Connection ...

    MulThreadSQLiteTest多线程操作数据库

    在IT领域,数据库是存储和管理数据的核心工具,而SQLite是一个轻量级的、开源的、自包含的数据库系统,广泛应用于移动设备和...通过合理的设计和优化,我们可以充分利用多线程的优势,实现高效、稳定的数据库操作。

    Delphi多线程数据库应用程序编程技术

    在IT领域,多线程数据库应用程序编程是一项关键的技术,尤其在使用Delphi这种高效、强大的RAD(快速应用开发)工具时。Delphi以其高效的VCL框架和原生的编译器支持,使得开发者能够轻松地构建多线程的数据库应用,...

    多线程对数据库操作

    本项目涉及的关键知识点是“多线程对数据库操作”,主要使用Java语言实现,并与MySQL数据库进行交互。以下是对这些核心概念的详细阐述。 **1. 多线程** 多线程是指在一个进程中同时执行多个不同的线程。Java提供了...

    多线程 数据库 插入 实例

    多线程 数据库 插入 实例 自己写的还不错哦

    Qt 多线程连接数据库——数据库连接池

    * 支持多线程,保证获取到的连接一定是没有被其他线程正在使用 * 按需创建连接,可以创建多个连接,可以控制连接的数量 * 连接被复用,不是每次都重新创建一个新的连接(连接的创建是一个很消耗资源的过程) * ...

    java多线程查询数据库

    在Java编程中,多线程查询数据库是一种常见的优化策略,特别是在处理大数据量或者需要并行执行多个查询时。本文将详细探讨如何利用...通过理解和掌握这些知识点,我们可以有效地提高数据库操作的效率和系统的响应速度。

    多线程定时并发类数据库操作日之类

    在IT领域,多线程、定时并发以及数据库操作是核心概念,特别是在开发高效、稳定的应用系统时。日志类则是追踪程序运行状态、排查问题的关键工具。以下将详细阐述这些知识点。 1. **多线程**:多线程是指在一个程序...

    易语言多线程数据库查询对比

    本主题将深入探讨"易语言多线程数据库查询对比"的相关知识点。 首先,我们要理解易语言中的多线程概念。在易语言中,创建线程可以通过调用特定的系统API函数或者使用内置的线程支持模块来实现。多线程允许程序在...

    操作多线程删除数据库表,以及控制listbox多线程呈现

    在这个"操作多线程删除数据库表,以及控制listbox多线程呈现"的小程序中,我们将探讨如何利用多线程来处理数据库操作和UI更新。 1. **多线程基础**:多线程是指在一个应用程序中同时运行多个独立的执行流。在.NET ...

    android sqlite多线程和异步加载数据库数据示例

    在这个示例中,我们将探讨如何在Android中进行多线程数据库操作以及如何利用CursorAdapter实现异步加载数据库数据。 首先,我们来看多线程访问SQLite数据库。在Android中,主线程负责UI的更新和交互,而长时间运行...

    一个多线程访问数据库的代码

    通过分析和学习这个代码,你可以理解如何在实际项目中实现多线程数据库访问,以及如何解决相关的问题。 总之,多线程访问数据库是一项重要的技能,尤其在大型系统和高并发应用中。通过VC6.0的实践,你可以更好地...

    多线程同时查询同一数据库对比

    事务的ACID属性(原子性、一致性、隔离性和持久性)是保证数据库操作正确性的基础。 4. **性能调优**:根据数据库系统的特性调整线程数量,过多的线程可能会导致上下文切换开销增大,反而降低性能。适当的线程池...

    QT中sqlite多线程操作4个注意问题

    在开发基于Qt的应用程序时,经常会遇到需要使用多线程来进行SQLite数据库操作的情况。然而,多线程环境下的数据库操作相较于单线程来说更为复杂,需要特别注意一些细节问题,以确保程序的稳定性和效率。本文将总结在...

    JasonQt_Database Demo 支持多线程

    《JasonQt_Database Demo:多线程数据库操作详解》 在现代软件开发中,多线程已经成为提升程序性能和响应速度的重要手段。特别是在处理数据库操作时,为了保证系统的并发性和高效性,多线程技术的应用尤为关键。...

Global site tag (gtag.js) - Google Analytics