`
jiangzhoubai
  • 浏览: 58721 次
  • 性别: Icon_minigender_1
  • 来自: 太原
社区版块
存档分类
最新评论

多线程和同步队列

阅读更多

最近要割接个项目,要把另外一个公司的数据库里的一张表倒到我们库里,数据有一亿三千多万吧。正号也符合生产者和消费者的状况。以前用过点线程池和同步队列,,写个例子,让大家拍砖。不多说了,直接上代码,

1. 线程池

 

 

public class ThreadPool {

private ExecutorService exe = null;// 线程池

 

private int pool_size;

private  Service  service ;

public ThreadPool(int pool_size, Service  service   {

this.pool_size = pool_size;

this. service  = service  ;

exe = Executors.newFixedThreadPool(pool_size);// 创建线程池

System.out.println("the server is ready");

}

 

/**

* 运行循环实例线程,根据要实例的线程个数,传入条件ID

* @param worknum

*/

public void server() {

int i = 0;

while (i < pool_size) {

// 实例指定个线程

InsertData t = new InsertData( service  );

exe.execute(t);// 放入线程池

i++;

}

}

}

2.线程
public class InsertData extends Thread {
private Service  service ;
public InsertData( Service  service  ){
this. service  = service  ;
}
@Override
public void run() {
SyncDataQueue syncDataQueue=SyncDataQueue.getInstance();
while (true) {
List< Mode > list=new ArrayList<Mode>();
//移除队列里的所有实体,并批量添加
syncDataQueue.getBlockingQueue().drainTo(list);
if (list!=null) {
service.batchSvae(list);
}
}
}
}
3.实现同步队列
public class SyncDataQueue {

private static SyncDataQueue instance = null;
private BlockingQueue< Mode > blockingQueue = null;

public SyncDataQueue() {
  //队列容量
blockingQueue = new ArrayBlockingQueue< Mode >(40000);
}

public static SyncDataQueue getInstance() {
if (instance == null) {
instance = new SyncDataQueue();
}
return instance;
}

public BlockingQueue< Mode > getBlockingQueue() {
return blockingQueue;
}
}
4.导入文件,并启动线程池
public class FileThread extends Thread {

private String path;
public FileThread(String path) {
this.path = path;
}

@Override
public void run() {
if (StringUtils.isNotBlank(path)) {
try {
getSmsListByFileThread(new File(path));
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void getSmsListByFileThread(File file)
throws FileNotFoundException, IOException, InterruptedException {
//初始化同步队列
SyncDataQueue syncDataQueue = SyncDataQueue.getInstance();
BlockingQueue< Mode > blockingQueue = syncDataQueue.getBlockingQueue();
// 缓冲大小为10M
final int BUFFER_SIZE = 10 * 1024 * 1024;
BufferedInputStream fis = new BufferedInputStream(new FileInputStream(file));
// 用10M的缓冲读取文本文件
BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"), BUFFER_SIZE);
String line = "";
int i=0;
while ((line = reader.readLine()) != null) {
String[] data = line.split(",");
Mode mode=new Mode();
//填充实体
//放进队列 让 线程去抢队列的里实体
blockingQueue.put(mode);
}
}

public String getPath() {
return path;
}

public void setPath(String path) {
this.path = path;
}
/**
*  不加索引每分钟百万上下, 加了索引有点惨不忍睹。
*/
public static void main(String[] args) {
Service service=(Service)Syscontext.getBean(" service ");
//初始化线程池,并启动
ThreadPool threadPool = new ThreadPool(30,smsHouseSmsService);
threadPool.server();
//每个文件都是1G的
new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_1.txt").start();
........
new FileThread("C:/Users/bjz/Desktop/mcexport/Msg_20120322104001_1_15.txt").start();
}
}

 

0
0
分享到:
评论
5 楼 gamehiboy 2012-04-10  
求Service类的代码,
4 楼 jiangzhoubai 2012-04-09  
gamehiboy 写道
Service类是啥,

插入数据的service
3 楼 gamehiboy 2012-04-09  
Service类是啥,
2 楼 jiangzhoubai 2012-03-30  
netkiller.github.com 写道
:-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事

就是开几个进程,在哪里守候,每个进程可以开启N个线程。

剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。

有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700


好的
1 楼 netkiller.github.com 2012-03-30  
:-) 我也刚刚做过类似的东西,我是造数据,python 实现比java 省事

就是开几个进程,在哪里守候,每个进程可以开启N个线程。

剩下就是往Queue里面赛东西,那些守护进程从queue里面拿任务。。

有兴趣看看
http://netkiller-github-com.iteye.com/blog/1453700

相关推荐

Global site tag (gtag.js) - Google Analytics