精华帖 (0) :: 良好帖 (0) :: 新手帖 (1) :: 隐藏帖 (0)
|
|
---|---|
作者 | 正文 |
发表时间:2011-04-07
最后修改:2011-04-07
最近做一个数据下载接口,功能是这样的:拿n个基金的id,轮训接口下载每一个基金id的详细回来录入自己的数据库。n>10000, 所以采用多线程下载入库。ok。。问题来了。当我采用200根线程同时做这个事情的时候。 调用接口没有问题,在入库的时候connection的创建我有两种想法: 第一:200根线程公用一个connection和PreparedStatement。因为我觉得200根线程的入库操作都是一样的。sql语句都一样。 唯一不同的是给sql参数赋值的过程。(也许会有人说。是否会有线程安全的问题。实践证明并没有引发异常。。。这也是我的一个疑问。 求解。。。。),还有一方面的考虑是,频繁的打开,关闭连接也是会造成了一定的内存消耗。 第二:用线程池管理connection,每根线程拥有自己的connection。(这个不用说,大部分首先都会考虑用线程池,但这里也会有一个问题。当启动200根线程,而连接池中供我的连接假设只有50个,那么也就是意味着有150根线程处于等的状态,这就照成整个接口全部执行完消耗的时间增大。)
希望有人能为我解答上面两种方案的优劣。! 代码是这样的。
public class DownloadTask implements Runnable { private final Connection con; public DownloadTask(Connection con) { this.con = con; } public void run() { // Long downloadLong = System.currentTimeMillis(); List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>(); //调用接口下载详细 for (String id : idsList) { MonitorEntity entity = getMonitorEntity(id); monitorEntitys.add(entity); } // System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s"); Long timeLong = System.currentTimeMillis(); Connection connection = con; PreparedStatement ps = null; try { // connection.setAutoCommit(false); ps = connection.prepareStatement(sqlString); for (MonitorEntity entity : monitorEntitys) { ps.setString(1, entity.getApi_name()); ps.setString(2, entity.getId()); ps.setString(3, entity.getId_type()); ps.setString(4, entity.getRequest_url()); ps.setInt(5, entity.getHttp_code()); ps.setInt(6, entity.getRes_code()); ps.setInt(7, entity.getRes_time()); ps.setString(8, entity.getRes_msg()); ps.setString(9, entity.getResult()); ps.setString(10, entity.getLast_update_time()); ps.setString(11, entity.getRequest_url()); ps.setInt(12, entity.getHttp_code()); ps.setInt(13, entity.getRes_code()); ps.setInt(14, entity.getRes_time()); ps.setString(15, entity.getRes_msg()); ps.setString(16, entity.getResult()); ps.setString(17, entity.getLast_update_time()); ps.addBatch(); } ps.executeBatch(); // connection.commit(); } catch (Exception e) { log.error("insert or update database error", e); } finally { jdbcUtil.closePreparedStatement(ps); } System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s"); countdown.countDown(); }
一个类实现Runnable接口,在这个这个类构造函数中增加一个参数Connection con,然后再主线程 new 子线程的时候吧Connection 作为参数传递进来。 这里只是公用一个connection, 效率很慢,一部分线程只用不到1s钟就执行完了。 但越到后面的线程执行的时间却越长,我个人认为是一个connection只能new出一定的PreparedStatement ,所以导致后面的线程必须等前面的线程执行完,释放掉PreparedStatement ,才能创建新的PreparedStatement 。不知道这样认为是否正确。 后来我想,如果是这样干脆把PreparedStatement 作为参数传递进来。是否能快呢? 代码如下:
public class DownloadTask implements Runnable { static Log log = LogFactory.getLog(DownloadTask.class); private final JDBCUtil jdbcUtil; private final Connection con; private final List<String> idsList; private final String url_head; private final String url_param; private final String sqlString; private final String api_name; private final String id_type; CountDownLatch countdown; private final PreparedStatement ps; private final int i; static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss"); public DownloadTask(JDBCUtil jdbcUtil, Connection con, List<String> idsList, String url_head, String url_param, String sqlString, String api_name, String id_type,CountDownLatch countdown,int i,PreparedStatement ps) { this.jdbcUtil = jdbcUtil; this.con = con; this.idsList = idsList; this.url_head = url_head; this.url_param = url_param; this.sqlString = sqlString; this.api_name = api_name; this.id_type = id_type; this.countdown = countdown; this.i = i ; this.ps = ps; } private MonitorEntity getMonitorEntity(String id) { MonitorEntity entity = new MonitorEntity(); String url = url_head + id + url_param; Long startTimeLong = System.currentTimeMillis(); String[] responseString; try { responseString = DownloadData.download(url); } catch (Exception e) { log.error("download error url="+url,e); return null; } Long endTimeLong = System.currentTimeMillis(); Long restimeLong = endTimeLong - startTimeLong; entity.setId(id); entity.setId_type(id_type); entity.setApi_name(api_name); entity.setRequest_url(url); entity.setHttp_code(Integer.parseInt(responseString[0])); entity.setRes_time(restimeLong.intValue()); String resultString = responseString[1]; try { JSONObject jsonObject = JSONObject.fromObject(resultString); JSONObject statusJsonObject = jsonObject.getJSONObject("status"); entity.setRes_code(statusJsonObject.getInt("code")); entity.setRes_msg(statusJsonObject.getString("message")); entity.setResult(jsonObject.getJSONObject("data").toString()); } catch (Exception e) { log.error("analysis jsonobject error ", e); entity.setRes_code(-1); entity.setRes_msg("analysis jsonobject error"); entity.setResult("error"); } entity.setLast_update_time(sf.format(new Date())); return entity; } public void run() { Long timeLong = System.currentTimeMillis(); // Long downloadLong = System.currentTimeMillis(); List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>(); for (String id : idsList) { MonitorEntity entity = getMonitorEntity(id); monitorEntitys.add(entity); } // System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s"); // Connection connection = jdbcUtil.getConnection(); //PreparedStatement ps = null; try { // connection.setAutoCommit(false); //ps = connection.prepareStatement(sqlString); for (MonitorEntity entity : monitorEntitys) { ps.setString(1, entity.getApi_name()); ps.setString(2, entity.getId()); ps.setString(3, entity.getId_type()); ps.setString(4, entity.getRequest_url()); ps.setInt(5, entity.getHttp_code()); ps.setInt(6, entity.getRes_code()); ps.setInt(7, entity.getRes_time()); ps.setString(8, entity.getRes_msg()); ps.setString(9, entity.getResult()); ps.setString(10, entity.getLast_update_time()); ps.setString(11, entity.getRequest_url()); ps.setInt(12, entity.getHttp_code()); ps.setInt(13, entity.getRes_code()); ps.setInt(14, entity.getRes_time()); ps.setString(15, entity.getRes_msg()); ps.setString(16, entity.getResult()); ps.setString(17, entity.getLast_update_time()); ps.addBatch(); } ps.executeBatch(); // connection.commit(); } catch (Exception e) { log.error("insert or update database error", e); } finally { //jdbcUtil.closePreparedStatement(ps);//由主线程控制关闭 //jdbcUtil.closeConnection(connection);//由主线程控制关闭 } System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s"); countdown.countDown(); } }
测试结果是入库的时间快了一倍,每一个线程的入库时间基本相同。(如果用多线程一部分线程回慢,因为要等需要的connection)。。 求解释。。。 多线程下公用一个PreparedStatement ,是否回引发线程安全的问题。理论上上面应该会引发才对。但运行结果。。却没有。。。 还有一个隐形问题。。 多线程公用一个PreparedStatement ,在赋值时是否会有问题。比如:A线程吧值赋到B线程去了。 反正异常是没有报。
主线程代码如下: package com.morningstar.api; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.morningstar.api.ApiConfig; import com.morningstar.api.util.JDBCParameterCallBack; import com.morningstar.api.util.JDBCUtil; public class Application { static Log log = LogFactory.getLog(Application.class); private static String sqlString = "(api_name,id,id_type,request_url,http_code,state_code,response_time,state_msg,result,last_update) " + "value(?,?,?,?,?,?,?,?,?,?)" + " on duplicate key update request_url=?,http_code=?,state_code=?,response_time=?," + "state_msg=?,result=?,last_update=?"; public static void main(String[] args) throws Exception { ApiConfig apiConfig = new ApiConfig(); String url_param = "?username=" + apiConfig.getUsername() + "&password=" + apiConfig.getPassword() + "&format=json"; //获取ids集合 Map<String, List<String>> idTypeMap = apiConfig.getIdsMap(); //任务数 int taskCount = apiConfig.getTask_count(); JDBCUtil jdbcUtil = new JDBCUtil(apiConfig.getJdbc_url(), apiConfig.getJdbc_name(), apiConfig.getJdbc_pass(), apiConfig.getJdbc_driver(), apiConfig.getMax_conn(), apiConfig.getMin_conn()); Connection con = null; //API名字集合 List<String> apiNames = apiConfig.getApi_names(); //获取创建表sql String createTableSql = apiConfig.getCreateTableSql(); Connection connection = jdbcUtil.getConnection(); for (String apiname : apiNames) { String url_head = apiConfig.getApi_url() + apiname + "/"; jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql); String sql = "insert t_monitor_"+apiname+sqlString; PreparedStatement ps = null; ps = connection.prepareStatement(sql); for (String idtype : idTypeMap.keySet()) { Long starttimeLong = System.currentTimeMillis(); List<String> idsList = idTypeMap.get(idtype); log.info("start download "+apiname+" " + idtype + " list size:" + idsList.size()); int number = idsList.size() / taskCount; int rem = idsList.size() % taskCount; if (rem != 0) { taskCount++; } CountDownLatch countdown = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { int startIndex = i * number; int endIndex = (i + 1) * number > idsList.size() ? idsList .size() : (i + 1) * number; List<String> ids = idsList.subList(startIndex, endIndex); DownloadTask downloadTask = new DownloadTask(jdbcUtil, con, ids, url_head + idtype + "/", url_param, sql, apiname, idtype, countdown, i,ps); Thread thread = new Thread(downloadTask); thread.start(); } try { long timeout = 60; countdown.await(timeout, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error("thread is interrupted", e); } Long endtimeLong = System.currentTimeMillis(); log.info("end download "+apiname+" " + idtype + " list size:" + idsList.size() + " over time:" + (endtimeLong - starttimeLong) / 1000 + "s"); } jdbcUtil.closePreparedStatement(ps); } jdbcUtil.closeConnection(connection); } }
声明:ITeye文章版权属于作者,受法律保护。没有作者书面许可不得转载。
推荐链接
|
|
返回顶楼 | |
发表时间:2011-04-07
你给出操作DownloadTask的代码
|
|
返回顶楼 | |
发表时间:2011-04-07
kanny87929 写道 你给出操作DownloadTask的代码 ?????上面就DownloadTask的代码呀。 主线程代码如下: package com.morningstar.api; import java.sql.Connection; import java.sql.PreparedStatement; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.morningstar.api.ApiConfig; import com.morningstar.api.util.JDBCParameterCallBack; import com.morningstar.api.util.JDBCUtil; public class Application { static Log log = LogFactory.getLog(Application.class); private static String sqlString = "(api_name,id,id_type,request_url,http_code,state_code,response_time,state_msg,result,last_update) " + "value(?,?,?,?,?,?,?,?,?,?)" + " on duplicate key update request_url=?,http_code=?,state_code=?,response_time=?," + "state_msg=?,result=?,last_update=?"; public static void main(String[] args) throws Exception { ApiConfig apiConfig = new ApiConfig(); String url_param = "?username=" + apiConfig.getUsername() + "&password=" + apiConfig.getPassword() + "&format=json"; //获取ids集合 Map<String, List<String>> idTypeMap = apiConfig.getIdsMap(); //任务数 int taskCount = apiConfig.getTask_count(); JDBCUtil jdbcUtil = new JDBCUtil(apiConfig.getJdbc_url(), apiConfig.getJdbc_name(), apiConfig.getJdbc_pass(), apiConfig.getJdbc_driver(), apiConfig.getMax_conn(), apiConfig.getMin_conn()); Connection con = null; //API名字集合 List<String> apiNames = apiConfig.getApi_names(); //获取创建表sql String createTableSql = apiConfig.getCreateTableSql(); Connection connection = jdbcUtil.getConnection(); for (String apiname : apiNames) { String url_head = apiConfig.getApi_url() + apiname + "/"; jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql); String sql = "insert t_monitor_"+apiname+sqlString; PreparedStatement ps = null; ps = connection.prepareStatement(sql); for (String idtype : idTypeMap.keySet()) { Long starttimeLong = System.currentTimeMillis(); List<String> idsList = idTypeMap.get(idtype); log.info("start download "+apiname+" " + idtype + " list size:" + idsList.size()); int number = idsList.size() / taskCount; int rem = idsList.size() % taskCount; if (rem != 0) { taskCount++; } CountDownLatch countdown = new CountDownLatch(taskCount); for (int i = 0; i < taskCount; i++) { int startIndex = i * number; int endIndex = (i + 1) * number > idsList.size() ? idsList .size() : (i + 1) * number; List<String> ids = idsList.subList(startIndex, endIndex); DownloadTask downloadTask = new DownloadTask(jdbcUtil, con, ids, url_head + idtype + "/", url_param, sql, apiname, idtype, countdown, i,ps); Thread thread = new Thread(downloadTask); thread.start(); } try { long timeout = 60; countdown.await(timeout, TimeUnit.MINUTES); } catch (InterruptedException e) { log.error("thread is interrupted", e); } Long endtimeLong = System.currentTimeMillis(); log.info("end download "+apiname+" " + idtype + " list size:" + idsList.size() + " over time:" + (endtimeLong - starttimeLong) / 1000 + "s"); } jdbcUtil.closePreparedStatement(ps); } jdbcUtil.closeConnection(connection); } } |
|
返回顶楼 | |
发表时间:2011-04-07
//任务数
34. int taskCount = apiConfig.getTask_count(); 这个是什么我不是很明白 |
|
返回顶楼 | |
发表时间:2011-04-07
最后修改:2011-04-07
引用 Connection connection = jdbcUtil.getConnection(); 46. for (String apiname : apiNames) { 47. String url_head = apiConfig.getApi_url() + apiname + "/"; 48. 49. jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql); 50. String sql = "insert t_monitor_"+apiname+sqlString; 51. PreparedStatement ps = null; 52. ps = connection.prepareStatement(sql); 53. for (String idtype : idTypeMap.keySet()) { 54. Long starttimeLong = System.currentTimeMillis(); 55. List<String> idsList = idTypeMap.get(idtype); 56. log.info("start download "+apiname+" " + idtype + " list size:" 57. + idsList.size()); 58. int number = idsList.size() / taskCount; 59. int rem = idsList.size() % taskCount; 60. if (rem != 0) { 61. taskCount++; 62. } 63. CountDownLatch countdown = new CountDownLatch(taskCount); 64. for (int i = 0; i < taskCount; i++) { 65. 66. int startIndex = i * number; 67. int endIndex = (i + 1) * number > idsList.size() ? idsList 68. .size() : (i + 1) * number; 69. List<String> ids = idsList.subList(startIndex, endIndex); 70. 71. DownloadTask downloadTask = new DownloadTask(jdbcUtil, con, 72. ids, url_head + idtype + "/", url_param, sql, 73. apiname, idtype, countdown, i,ps); 74. Thread thread = new Thread(downloadTask); 75. thread.start(); 76. } 77. try { 78. long timeout = 60; 79. countdown.await(timeout, TimeUnit.MINUTES); 80. } catch (InterruptedException e) { 81. log.error("thread is interrupted", e); 82. } 83. Long endtimeLong = System.currentTimeMillis(); 84. log.info("end download "+apiname+" " + idtype + " list size:" + idsList.size() 85. + " over time:" + (endtimeLong - starttimeLong) / 1000 86. + "s"); 87. 88. } 89. jdbcUtil.closePreparedStatement(ps); 90. } 91. jdbcUtil.closeConnection(connection); 楼主写的代码让人看着蛋痛 1。所有的基金实体都有一个id 2。而且基金实体个数超过1w个 3。id还分类型,也就是基金实体分类型,同一个类型的id在一个id集合里 4。所有id集合又存放在一个idMap中 我看你的代码是,用了第一种方案 有一个apiName的集合,而且你只打开了一个连接。 每一个apiName创建一个表,每个表都要插入所有的基金实体 这样你的代码写的有问题 如果一个id在所有类型的id集合中只出现一次,那么就是说,你要下载 apiNames.size次这个实体,有问题。 |
|
返回顶楼 | |
发表时间:2011-04-07
下载的东西放到一个Queue里面,做个后台线程定时刷新到数据库中。每次刷新时,Queue有多少记录,就一次性写入数据库中。
InsertQueueService这种概念:[http://code.google.com/p/guzz/wiki/AppendCoreService?wl=zh-Hans#5._数据库插入队列服务:] |
|
返回顶楼 | |
发表时间:2011-04-07
kanny87929 写道 //任务数
34. int taskCount = apiConfig.getTask_count(); 这个是什么我不是很明白 这个线程数,在配置文件中出现的。线程数是动态的。。。。 |
|
返回顶楼 | |
发表时间:2011-04-07
我一个一个解释哈。。 首先api的个数是动态的。 也就是说。api的名字会增加。我把api配置在配置文件中。
一个api对应一个表。 所以就出现第一个for循环。 先创建表。 循环api 2.基金id和基金类型。 a类型下有1w的id b类型下也有n个。 所以就以map储存。 类型做键,idlist做值。 出现第二个for循环。 循环list。 3.取出每一个类型的idlist后,根据配置文件中配置的线程数,分配每一根线程控制当前idlist的个数。 4.上面这些都不重点。 只是业务逻辑问题。 我的问题是。。多线程下。。 公用一个connection,PreparedStatement 有没有问题。 理论上应该是有问题的。 但现在没有任何异常。 而且比采用连接池更快。快了一倍的时间。 用连接池:一部分线程用了1s。 一部分线程用了10s 越到后面的线程越慢。(这好说。 因为连接池的连接不够用,后面的线程需等前面的线程先释放掉。所以后面的线程慢一些。) 可我公用一个 PreparedStatement , 就出现把PreparedStatement 做为参数传递进去。结果是:每一根线程执行的时间相等了。 也就是说。。真正达到了并行的效果。 kanny87929 写道 引用 Connection connection = jdbcUtil.getConnection(); 46. for (String apiname : apiNames) { 47. String url_head = apiConfig.getApi_url() + apiname + "/"; 48. 49. jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql); 50. String sql = "insert t_monitor_"+apiname+sqlString; 51. PreparedStatement ps = null; 52. ps = connection.prepareStatement(sql); 53. for (String idtype : idTypeMap.keySet()) { 54. Long starttimeLong = System.currentTimeMillis(); 55. List<String> idsList = idTypeMap.get(idtype); 56. log.info("start download "+apiname+" " + idtype + " list size:" 57. + idsList.size()); 58. int number = idsList.size() / taskCount; 59. int rem = idsList.size() % taskCount; 60. if (rem != 0) { 61. taskCount++; 62. } 63. CountDownLatch countdown = new CountDownLatch(taskCount); 64. for (int i = 0; i < taskCount; i++) { 65. 66. int startIndex = i * number; 67. int endIndex = (i + 1) * number > idsList.size() ? idsList 68. .size() : (i + 1) * number; 69. List<String> ids = idsList.subList(startIndex, endIndex); 70. 71. DownloadTask downloadTask = new DownloadTask(jdbcUtil, con, 72. ids, url_head + idtype + "/", url_param, sql, 73. apiname, idtype, countdown, i,ps); 74. Thread thread = new Thread(downloadTask); 75. thread.start(); 76. } 77. try { 78. long timeout = 60; 79. countdown.await(timeout, TimeUnit.MINUTES); 80. } catch (InterruptedException e) { 81. log.error("thread is interrupted", e); 82. } 83. Long endtimeLong = System.currentTimeMillis(); 84. log.info("end download "+apiname+" " + idtype + " list size:" + idsList.size() 85. + " over time:" + (endtimeLong - starttimeLong) / 1000 86. + "s"); 87. 88. } 89. jdbcUtil.closePreparedStatement(ps); 90. } 91. jdbcUtil.closeConnection(connection); 楼主写的代码让人看着蛋痛 1。所有的基金实体都有一个id 2。而且基金实体个数超过1w个 3。id还分类型,也就是基金实体分类型,同一个类型的id在一个id集合里 4。所有id集合又存放在一个idMap中 我看你的代码是,用了第一种方案 有一个apiName的集合,而且你只打开了一个连接。 每一个apiName创建一个表,每个表都要插入所有的基金实体 这样你的代码写的有问题 如果一个id在所有类型的id集合中只出现一次,那么就是说,你要下载 apiNames.size次这个实体,有问题。 |
|
返回顶楼 | |
发表时间:2011-04-07
共用connection,
你试试auto commit=false,丢几个大事务(latency)或嵌套事务进去,多线程下,你就能蛋疼了。 |
|
返回顶楼 | |
发表时间:2011-04-07
试想一下,
同一个connection/statement,Thread1 prepared啊,batch啊,insert啊,commit的时候,Thread2 rollback first,.... |
|
返回顶楼 | |