最近做一个数据下载接口,功能是这样的:拿n个基金的id,轮训接口下载每一个基金id的详细回来录入自己的数据库。n>10000, 所以采用多线程下载入库。ok。。问题来了。当我采用200根线程同时做这个事情的时候。 调用接口没有问题,在入库的时候connection的创建我有两种想法:
第一:200根线程公用一个connection和PreparedStatement。因为我觉得200根线程的入库操作都是一样的。sql语句都一样。 唯一不同的是给sql参数赋值的过程。(也许会有人说。是否会有线程安全的问题。实践证明并没有引发异常。。。这也是我的一个疑问。 求解。。。。),还有一方面的考虑是,频繁的打开,关闭连接也是会造成了一定的内存消耗。
第二:用线程池管理connection,每根线程拥有自己的connection。(这个不用说,大部分首先都会考虑用线程池,但这里也会有一个问题。当启动200根线程,而连接池中供我的连接假设只有50个,那么也就是意味着有150根线程处于等的状态,这就照成整个接口全部执行完消耗的时间增大。)
希望有人能为我解答上面两种方案的优劣。!
代码是这样的。
public class DownloadTask implements Runnable {
private final Connection con;
public DownloadTask(Connection con) {
this.con = con;
}
public void run() {
// Long downloadLong = System.currentTimeMillis();
List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>();
//调用接口下载详细
for (String id : idsList) {
MonitorEntity entity = getMonitorEntity(id);
monitorEntitys.add(entity);
}
// System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s");
Long timeLong = System.currentTimeMillis();
Connection connection = con;
PreparedStatement ps = null;
try {
// connection.setAutoCommit(false);
ps = connection.prepareStatement(sqlString);
for (MonitorEntity entity : monitorEntitys) {
ps.setString(1, entity.getApi_name());
ps.setString(2, entity.getId());
ps.setString(3, entity.getId_type());
ps.setString(4, entity.getRequest_url());
ps.setInt(5, entity.getHttp_code());
ps.setInt(6, entity.getRes_code());
ps.setInt(7, entity.getRes_time());
ps.setString(8, entity.getRes_msg());
ps.setString(9, entity.getResult());
ps.setString(10, entity.getLast_update_time());
ps.setString(11, entity.getRequest_url());
ps.setInt(12, entity.getHttp_code());
ps.setInt(13, entity.getRes_code());
ps.setInt(14, entity.getRes_time());
ps.setString(15, entity.getRes_msg());
ps.setString(16, entity.getResult());
ps.setString(17, entity.getLast_update_time());
ps.addBatch();
}
ps.executeBatch();
// connection.commit();
} catch (Exception e) {
log.error("insert or update database error", e);
} finally {
jdbcUtil.closePreparedStatement(ps);
}
System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s");
countdown.countDown();
}
一个类实现Runnable接口,在这个这个类构造函数中增加一个参数Connection con,然后再主线程 new 子线程的时候吧Connection 作为参数传递进来。
这里只是公用一个connection, 效率很慢,一部分线程只用不到1s钟就执行完了。 但越到后面的线程执行的时间却越长,我个人认为是一个connection只能new出一定的PreparedStatement ,所以导致后面的线程必须等前面的线程执行完,释放掉PreparedStatement ,才能创建新的PreparedStatement 。不知道这样认为是否正确。
后来我想,如果是这样干脆把PreparedStatement 作为参数传递进来。是否能快呢?
代码如下:
public class DownloadTask implements Runnable {
static Log log = LogFactory.getLog(DownloadTask.class);
private final JDBCUtil jdbcUtil;
private final Connection con;
private final List<String> idsList;
private final String url_head;
private final String url_param;
private final String sqlString;
private final String api_name;
private final String id_type;
CountDownLatch countdown;
private final PreparedStatement ps;
private final int i;
static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
public DownloadTask(JDBCUtil jdbcUtil, Connection con,
List<String> idsList, String url_head, String url_param,
String sqlString, String api_name, String id_type,CountDownLatch countdown,int i,PreparedStatement ps) {
this.jdbcUtil = jdbcUtil;
this.con = con;
this.idsList = idsList;
this.url_head = url_head;
this.url_param = url_param;
this.sqlString = sqlString;
this.api_name = api_name;
this.id_type = id_type;
this.countdown = countdown;
this.i = i ;
this.ps = ps;
}
private MonitorEntity getMonitorEntity(String id) {
MonitorEntity entity = new MonitorEntity();
String url = url_head + id + url_param;
Long startTimeLong = System.currentTimeMillis();
String[] responseString;
try {
responseString = DownloadData.download(url);
} catch (Exception e) {
log.error("download error url="+url,e);
return null;
}
Long endTimeLong = System.currentTimeMillis();
Long restimeLong = endTimeLong - startTimeLong;
entity.setId(id);
entity.setId_type(id_type);
entity.setApi_name(api_name);
entity.setRequest_url(url);
entity.setHttp_code(Integer.parseInt(responseString[0]));
entity.setRes_time(restimeLong.intValue());
String resultString = responseString[1];
try {
JSONObject jsonObject = JSONObject.fromObject(resultString);
JSONObject statusJsonObject = jsonObject.getJSONObject("status");
entity.setRes_code(statusJsonObject.getInt("code"));
entity.setRes_msg(statusJsonObject.getString("message"));
entity.setResult(jsonObject.getJSONObject("data").toString());
} catch (Exception e) {
log.error("analysis jsonobject error ", e);
entity.setRes_code(-1);
entity.setRes_msg("analysis jsonobject error");
entity.setResult("error");
}
entity.setLast_update_time(sf.format(new Date()));
return entity;
}
public void run() {
Long timeLong = System.currentTimeMillis();
// Long downloadLong = System.currentTimeMillis();
List<MonitorEntity> monitorEntitys = new ArrayList<MonitorEntity>();
for (String id : idsList) {
MonitorEntity entity = getMonitorEntity(id);
monitorEntitys.add(entity);
}
// System.out.println("i ="+i+" download api over "+(System.currentTimeMillis()-downloadLong)/1000+" s");
// Connection connection = jdbcUtil.getConnection();
//PreparedStatement ps = null;
try {
// connection.setAutoCommit(false);
//ps = connection.prepareStatement(sqlString);
for (MonitorEntity entity : monitorEntitys) {
ps.setString(1, entity.getApi_name());
ps.setString(2, entity.getId());
ps.setString(3, entity.getId_type());
ps.setString(4, entity.getRequest_url());
ps.setInt(5, entity.getHttp_code());
ps.setInt(6, entity.getRes_code());
ps.setInt(7, entity.getRes_time());
ps.setString(8, entity.getRes_msg());
ps.setString(9, entity.getResult());
ps.setString(10, entity.getLast_update_time());
ps.setString(11, entity.getRequest_url());
ps.setInt(12, entity.getHttp_code());
ps.setInt(13, entity.getRes_code());
ps.setInt(14, entity.getRes_time());
ps.setString(15, entity.getRes_msg());
ps.setString(16, entity.getResult());
ps.setString(17, entity.getLast_update_time());
ps.addBatch();
}
ps.executeBatch();
// connection.commit();
} catch (Exception e) {
log.error("insert or update database error", e);
} finally {
//jdbcUtil.closePreparedStatement(ps);//由主线程控制关闭
//jdbcUtil.closeConnection(connection);//由主线程控制关闭
}
System.out.println("i ="+i+" insertOrUpdate database over "+(System.currentTimeMillis()-timeLong)/1000+" s");
countdown.countDown();
}
}
测试结果是入库的时间快了一倍,每一个线程的入库时间基本相同。(如果用多线程一部分线程回慢,因为要等需要的connection)。。
求解释。。。 多线程下公用一个PreparedStatement ,是否回引发线程安全的问题。理论上上面应该会引发才对。但运行结果。。却没有。。。 还有一个隐形问题。。 多线程公用一个PreparedStatement ,在赋值时是否会有问题。比如:A线程吧值赋到B线程去了。 反正异常是没有报。
主线程代码如下:
package com.morningstar.api;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.morningstar.api.ApiConfig;
import com.morningstar.api.util.JDBCParameterCallBack;
import com.morningstar.api.util.JDBCUtil;
public class Application {
static Log log = LogFactory.getLog(Application.class);
private static String sqlString = "(api_name,id,id_type,request_url,http_code,state_code,response_time,state_msg,result,last_update) "
+ "value(?,?,?,?,?,?,?,?,?,?)"
+ " on duplicate key update request_url=?,http_code=?,state_code=?,response_time=?,"
+ "state_msg=?,result=?,last_update=?";
public static void main(String[] args) throws Exception {
ApiConfig apiConfig = new ApiConfig();
String url_param = "?username=" + apiConfig.getUsername()
+ "&password=" + apiConfig.getPassword() + "&format=json";
//获取ids集合
Map<String, List<String>> idTypeMap = apiConfig.getIdsMap();
//任务数
int taskCount = apiConfig.getTask_count();
JDBCUtil jdbcUtil = new JDBCUtil(apiConfig.getJdbc_url(),
apiConfig.getJdbc_name(), apiConfig.getJdbc_pass(),
apiConfig.getJdbc_driver(), apiConfig.getMax_conn(),
apiConfig.getMin_conn());
Connection con = null;
//API名字集合
List<String> apiNames = apiConfig.getApi_names();
//获取创建表sql
String createTableSql = apiConfig.getCreateTableSql();
Connection connection = jdbcUtil.getConnection();
for (String apiname : apiNames) {
String url_head = apiConfig.getApi_url() + apiname + "/";
jdbcUtil.insertOrUpdate("CREATE TABLE IF NOT EXISTS t_monitor_"+apiname+createTableSql);
String sql = "insert t_monitor_"+apiname+sqlString;
PreparedStatement ps = null;
ps = connection.prepareStatement(sql);
for (String idtype : idTypeMap.keySet()) {
Long starttimeLong = System.currentTimeMillis();
List<String> idsList = idTypeMap.get(idtype);
log.info("start download "+apiname+" " + idtype + " list size:"
+ idsList.size());
int number = idsList.size() / taskCount;
int rem = idsList.size() % taskCount;
if (rem != 0) {
taskCount++;
}
CountDownLatch countdown = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
int startIndex = i * number;
int endIndex = (i + 1) * number > idsList.size() ? idsList
.size() : (i + 1) * number;
List<String> ids = idsList.subList(startIndex, endIndex);
DownloadTask downloadTask = new DownloadTask(jdbcUtil, con,
ids, url_head + idtype + "/", url_param, sql,
apiname, idtype, countdown, i,ps);
Thread thread = new Thread(downloadTask);
thread.start();
}
try {
long timeout = 60;
countdown.await(timeout, TimeUnit.MINUTES);
} catch (InterruptedException e) {
log.error("thread is interrupted", e);
}
Long endtimeLong = System.currentTimeMillis();
log.info("end download "+apiname+" " + idtype + " list size:" + idsList.size()
+ " over time:" + (endtimeLong - starttimeLong) / 1000
+ "s");
}
jdbcUtil.closePreparedStatement(ps);
}
jdbcUtil.closeConnection(connection);
}
}
分享到:
相关推荐
以下是一个基本的多线程处理海量数据的步骤: 1. **配置线程池**:创建一个合适的线程池,例如,根据服务器性能和数据库连接限制设定线程数量。 2. **分块查询**:将查询语句设计为分页查询,每次只请求一定数量的...
这个项目可能是用Eclipse IDE创建的一个Java工程,包含示例代码来模拟多线程环境下JDBC的资源竞争问题,并提供解决方案。开发者可以通过运行这些示例,观察并发问题的表现,学习如何避免和解决这些问题。 6. 性能...
通过`DriverManager.getConnection()`方法可以获得一个`Connection`实例。 - **作用**:提供了与数据库交互的基本手段,如创建`Statement`、`PreparedStatement`或`CallableStatement`等对象。 - **生命周期管理**:...
在IT行业中,JDBCConnection是Java数据库连接(Java Database Connectivity)的一个重要概念,它是Java程序与各种数据库进行交互的基础。本项目"JdbcConnection"利用了JDBC API,结合Servlet和Tomcat服务器,以及...
2. **准备SQL语句**:接着,使用`Connection`对象的`prepareStatement(String sql)`方法创建一个`PreparedStatement`对象,其中`sql`参数是包含占位符的SQL语句。 ```java Connection con = DriverManager.get...
在Java编程中,多线程处理是提升程序性能和效率的重要手段,特别是在处理大量数据库数据时。本主题将深入探讨如何使用Java的并发包(java.util.concurrent)来实现多线程对数据库数据的批量处理,包括增、删、改等...
1. **多线程基础**:多线程是Java编程中的一个重要概念,允许程序同时执行多个任务。在本示例中,通过创建多个`Thread`对象并调用它们的`run()`方法来实现并行处理数据库操作。 2. **数据库连接管理**:在多线程...
- **Hibernate**:一个ORM(对象关系映射)框架,通过它可以使用Java对象直接操作数据库,简化了JDBC的使用。 - **Spring JdbcTemplate**和**Spring Data JPA**:Spring框架提供的工具,简化了数据库访问,提供了更...
在Java的JDBC编程中,`PreparedStatement`是一个非常重要的接口,它用于预编译SQL语句,提高了数据库操作的效率和安全性。当我们处理大量重复的SQL操作时,使用`PreparedStatement`可以避免SQL注入等问题,同时提升...
* 设置单例的一个实例方法 * * @return */ public static SqlHelper getInstance() { if (instance == null) { synchronized (SqlHelper.class) { instance = new SqlHelper(); } } return instance; } /** * ...
综上所述,通过JDBC连接程序实现数据库数据迁移是一个涉及多步骤的过程,包括连接数据库、编写查询、处理结果、批量插入、资源管理和错误处理。在实际操作中,应根据具体业务需求进行调整和优化,确保迁移的高效和...
`mysql-connector-odbc-3.51.28-win32.msi`是一个MySQL ODBC驱动程序的安装文件,安装后可以在ODBC数据源管理器中创建针对MySQL的数据源,然后使用ODBC连接字符串连接数据库。ODBC适用于那些不直接支持JDBC的应用...
2. **提高并发性能**:缓存的存在使得多个线程可以并行使用同一个预编译的SQL,减少了线程间的同步开销。 3. **节省资源**:创建和销毁`PreparedStatement`对象会消耗一定的资源,缓存可以有效减少这种开销。 以...
JDBC是Java API,它为Java程序员提供了一个标准的接口,用来访问各种关系型数据库。通过JDBC,开发者可以编写通用的代码来执行SQL语句,无需关心底层数据库的具体实现。 接下来,我们进入Eclipse环境,配置MySQL...
5. **处理结果集**:如果执行的是查询,`executeQuery()`会返回一个`ResultSet`对象,包含了查询的结果。可以遍历`ResultSet`获取数据。 6. **关闭资源**:完成数据库操作后,必须关闭所有打开的资源,包括`...
总之,"Java操作Oracle数据库-多线程.rar"提供的示例是一个很好的学习资源,涵盖了Java与Oracle数据库交互的基础,以及如何利用多线程技术来提升性能。通过深入研究源码,开发者不仅可以掌握基本的数据库操作,还能...
1. JDBC编程的6个基本步骤: - 加载和注册JDBC驱动:通过`Class.forName()`加载驱动类。 - 获取数据库连接:使用`DriverManager.getConnection()`创建连接。 - 创建Statement或PreparedStatement对象:根据需求...
MySQL的JDBC驱动称为MySQL Connector/J,它是一个符合Java Database Connectivity (JDBC) API规范的类型4纯Java驱动。这意味着它是一个独立于操作系统和数据库服务器的驱动,可以在任何支持Java的平台上运行。`mysql...
2. **建立连接**: 使用`DriverManager.getConnection()`方法,提供数据库URL、用户名和密码来创建一个`Connection`对象。 3. **创建Statement/PreparedStatement**: 通过`Connection`对象创建`Statement`或`...
多线程是指在一个进程中同时执行多个不同的线程。Java提供了丰富的多线程支持,通过`Thread`类和`Runnable`接口可以创建并运行新线程。在多线程环境中,每个线程可以独立地执行任务,从而提高了程序的并行处理能力。...