`
hope598
  • 浏览: 67045 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

大家帮忙看看这个多线程有问题没

阅读更多

整个流程是这样的,循环从一个文件中读取数据,每读6万条后就要暂停,启动三个线程(每次仅允许三个线程同时处理)来处理这6万条数据,处理结束后,再继续读。。。循环这样直到文件中数据全部处理完。大家帮忙看看,有什么问题没,例如处理流程、并发。。。等方面,多谢!

public class CopyOfTest {
	public static void main(String[] args) {
		Producer p = new Producer();
		while(p.producer() > 0){
			p.cunsumer();
		}
	}
}

class Producer {
	private Lock lock = new ReentrantLock();
	private Condition notEmpty = lock.newCondition();
	private Condition notFull = lock.newCondition();
	private List<List<String>> dataList = null;
	private BufferedReader  reader;
	private static ExecutorService executor = Executors.newFixedThreadPool(3);
	
	public Producer() {
		try {
			reader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("D:/11.txt")), "GBK"));
			dataList = new ArrayList<List<String>>();
		} catch (UnsupportedEncodingException e1) {
			e1.printStackTrace();
		} catch (FileNotFoundException e1) {
			e1.printStackTrace();
		}
	}
	
	public int producer() {
		String text = "";
		List<String> tmp = new ArrayList<String>();
		int count = 0;
		lock.lock();
		try {
			try {
				System.out.println("+++++++++++");
				if (dataList.size() == 0) {
					notEmpty.signalAll();
				}
				if (dataList.size() == 3) {
					try {
						notFull.await();
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
				}
				while ((text = reader.readLine()) != null) {
					count++;
					System.out.println(count + "+++" + text);
					tmp.add(text);
					if (count % 20000 == 0) {
						dataList.add(tmp);
						tmp = new ArrayList<String>();
					}
					if (dataList.size() == 3) {
						break;
					}
				}
				if (count % 60000 != 0) {
					dataList.add(tmp);
				}
				tmp = null;
			} catch (IOException e1) {
				e1.printStackTrace();
			}
			if (dataList.size() < 1) {
				executor.shutdown();
			}
			return dataList.size();
		} finally {
			lock.unlock();
		}
	}
	
	public void cunsumer() {
		lock.lock();
		int len = dataList.size();
		Future<?>[] futureArr = new Future<?>[len];
		try {
			if (dataList.size() == 0) {
				try {
					notEmpty.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			if (dataList.size() == 3) {
				notFull.signalAll();
			}
			for (int i = 0; i < len; i++) {
				futureArr[i] = executor.submit(new SingleThread12(dataList.get(i)));
			}
			boolean flag = true;
			while (flag) {
				for (Future<?> f : futureArr) {
					flag = flag && f.isDone();
				}
				flag = !flag;
			}
			dataList = new ArrayList<List<String>>();
		} finally {
			lock.unlock();
		}
	}
}

 

 

0
0
分享到:
评论
18 楼 quietwater 2013-03-16  
这个问题我是这样分析的。
生产者读取文件内容,3万条一组,放入长度为3的阻塞队列,一旦读取文件完毕并放入阻塞队列,置生产者状态位为真,默认为假。
消费者从队列中拿一组数据,成功取出后启动一个线程处理该数据,并将计数器加一,默认为零。当计数器为3时,休眠再检查。当生产者状态为真,并且阻塞队列为空时退出。

这里生产者状态为volatile boolean
计数值需要同步增加和减少,包括查看个数
17 楼 lovexp2010 2013-03-13  
唔系好人 写道

  九楼的内部类,你不用继承线程???


实现Runnable接口的匿名类,呵呵~
16 楼 backkom1982 2013-03-13  
你现在的思路,可以用CountdownLatch类实现,不需要自己关心线程之间的调度。

更好的方式,其实同学已经提到了,使用阻塞队列用多个consumer和provider同时处理。

比阻塞队列性能更好的工具可以使用disruptor
15 楼 唔系好人 2013-03-13  
yuyue007 写道
看了评论,个人还是觉得用多线程处理比较好,而且在处理的时候,可以一边读,一般处理数据,这样能最大限度的利用资源,节约时间。

贴一个我写的代码,欢迎指正。

package test.thread.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author yole.yu <br/>
 * Created date: 2013-3-13 <br/>
 */
public class ProducerConsumerTest {

    public static void main(String[] args) throws InterruptedException{

        Producer producer = new Producer();
        producer.read();

        BlockingQueue<String> data = producer.getData();
        Consumer consumer = new Consumer(data, producer);
        consumer.consume();
    }
}

class Producer{
    private volatile boolean endOfFile = false;
    private volatile int dataSequence=0;
    // store data, it's a warehouse
    private BlockingQueue<String> data = new ArrayBlockingQueue<String>(60);

    public void read(){
        //use a new thread to process read 
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("start produce...");
                while(!endOfFile){
                    try {
                        // read data from your file, I use dataSequence to identify data
                        data.put(++dataSequence + "");
                        System.out.println("produced " + dataSequence);
                        //assume dataSequence=123 means end of file
                        if(dataSequence == 123){
                            endOfFile = true;
                            System.out.println("all data have been read");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }

    public BlockingQueue<String> getData(){
        return data;
    }

    public boolean isEndOfFile() {
        return endOfFile;
    }
}

class Consumer{
    private int threadSize=3;
    private BlockingQueue<String> datas;
    private Producer producer;

    public Consumer(BlockingQueue<String> data, Producer producer){
        this.datas = data;
        this.producer = producer;
    }
    
    public void consume(){
        System.out.println("start consume..");
        ExecutorService pool = Executors.newFixedThreadPool(threadSize);
        for(int i=0; i<3; i++){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    while(datas.size()>0 || !producer.isEndOfFile()){
                        try {
                            String data = datas.take();
                            System.out.println(Thread.currentThread().getName() + "-----" + data);
                            TimeUnit.MILLISECONDS.sleep(300);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        pool.shutdown(); 
    }

    public int getThreadSize() {
        return threadSize;
    }

    public void setThreadSize(int threadSize) {
        this.threadSize = threadSize;
    }
}

  九楼的内部类,你不用继承线程???
14 楼 yuyue007 2013-03-13  
beykery 写道
我奇怪你为何不使用阻塞队列,你这个逻辑直接用阻塞队列,写起来更简单。

下面写了个阻塞队列的,貌似他不感兴趣哦。。
13 楼 beykery 2013-03-13  
我奇怪你为何不使用阻塞队列,你这个逻辑直接用阻塞队列,写起来更简单。
12 楼 hope598 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题

并且读的速度远远大于处理的速度,读完6万笔数据后,业务的处理要持续1、2分钟,这个时间段读操作要阻塞的
11 楼 kidding87 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题



边读边处理不是更好些
10 楼 hope598 2013-03-13  
zjhlht 写道
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题

内存中最多只允许存在三个线程在处理业务,不能一直的读数据出来,因为数据量比较大,经常会出现内存溢出。。。
9 楼 yuyue007 2013-03-13  
看了评论,个人还是觉得用多线程处理比较好,而且在处理的时候,可以一边读,一般处理数据,这样能最大限度的利用资源,节约时间。

贴一个我写的代码,欢迎指正。

package test.thread.concurrent;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * @author yole.yu <br/>
 * Created date: 2013-3-13 <br/>
 */
public class ProducerConsumerTest {

    public static void main(String[] args) throws InterruptedException{

        Producer producer = new Producer();
        producer.read();

        BlockingQueue<String> data = producer.getData();
        Consumer consumer = new Consumer(data, producer);
        consumer.consume();
    }
}

class Producer{
    private volatile boolean endOfFile = false;
    private volatile int dataSequence=0;
    // store data, it's a warehouse
    private BlockingQueue<String> data = new ArrayBlockingQueue<String>(60);

    public void read(){
        //use a new thread to process read 
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("start produce...");
                while(!endOfFile){
                    try {
                        // read data from your file, I use dataSequence to identify data
                        data.put(++dataSequence + "");
                        System.out.println("produced " + dataSequence);
                        //assume dataSequence=123 means end of file
                        if(dataSequence == 123){
                            endOfFile = true;
                            System.out.println("all data have been read");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();

    }

    public BlockingQueue<String> getData(){
        return data;
    }

    public boolean isEndOfFile() {
        return endOfFile;
    }
}

class Consumer{
    private int threadSize=3;
    private BlockingQueue<String> datas;
    private Producer producer;

    public Consumer(BlockingQueue<String> data, Producer producer){
        this.datas = data;
        this.producer = producer;
    }
    
    public void consume(){
        System.out.println("start consume..");
        ExecutorService pool = Executors.newFixedThreadPool(threadSize);
        for(int i=0; i<3; i++){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    while(datas.size()>0 || !producer.isEndOfFile()){
                        try {
                            String data = datas.take();
                            System.out.println(Thread.currentThread().getName() + "-----" + data);
                            TimeUnit.MILLISECONDS.sleep(300);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
        pool.shutdown(); 
    }

    public int getThreadSize() {
        return threadSize;
    }

    public void setThreadSize(int threadSize) {
        this.threadSize = threadSize;
    }
}
8 楼 zjhlht 2013-03-13  
advantech 写道
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。


同意!这个方法推荐~~~~~避免了很多问题
7 楼 advantech 2013-03-12  
这个程序的思路可以这样,你建n个长度1000的队列,读程序往队列里读,读满一个就把该队列放到一个线程去处理,然后换下一个空队列继续读。
线程处理完后将空队列重新交给读程序,如此循环。根本不需要用锁。
6 楼 derta2009 2013-03-12  
额,你这看的我晕啊,我按你的需求,大概写了例子意思一下啊。
public class ThreadPoolTest {

	public static void main(String[] args) {
		ThreadPoolTest tp=new ThreadPoolTest();
		ExecutorService es = Executors.newFixedThreadPool(6);//一次6个线程同时运行,每个线程处理10000条数据
		for(int i=0;i<10;i++){
			es.execute(tp.new newThread(i));
		}
		es.shutdown();
	}
	
	class newThread  extends Thread{
		private int i ;
		
		public newThread(int i) {
			this.i = i;
		}
		List<Integer> list = new ArrayList<Integer>();
		@Override
		public void run() {

			//读取一万调数据
			for(int i=0;i<10000;i++){
				list.add(i);
			}
			
			//处理数据
			System.out.println("处理了10000条数据");
		}
		
	}
}
5 楼 hope598 2013-03-12  
这个说成多线程的确不合适。。。还请多多指点,3q
4 楼 java_min 2013-03-12  
你确定你写的是多线程吗,好好看看多线程什么意思,多线程是怎么处理数据的。
3 楼 hope598 2013-03-12  
因为业务处理时间有点长,如果一次性全读出来,会出现内存溢出。。。
2 楼 panggezi 2013-03-12  
为啥非得等到6万条处理完producer才继续读?不然用BlockingQueue多方便。
1 楼 zj304292653 2013-03-12  
我觉得你这个消费者用三个线程没有意义,整个消费流程都被锁住了,还不如单线程

相关推荐

    有问题的QT程序,请大家帮忙看一下吧

    有问题的QT程序 这个程序的功能是客户端向服务器发送一个字符串,服务器接收到这个字符串之后,进行处理(复制一个相同的...(提示说不能向别的线程发送信号/数据,但我所有的问题都是在同一个线程里面处理的啊?)

    小米云相册导出至本地 原图下载 支持多线程

    多线程有bug 下载结束后 自动关闭软件 不急着要的话 建议单线程比较稳定 小米的cookie 打开云相册 刷新后 按f12 抓包获取 小米cookie 有时效 有一个接口可以直接获取 等v2.0后发布更新 大家先用着 昨天其实也发布...

    VFW(在WIN7下会黑屏共享后希望能有大拿帮忙解决一下)

    在标题中提到的问题,即"在WIN7下会黑屏共享后希望能有大拿帮忙解决一下",这可能是因为在尝试使用VFW进行摄像头共享时遇到了显示异常。在Windows 7中,摄像头的访问通常推荐使用DirectShow或Media Foundation,因为...

    用C#做的一个聊天软件

    这段代码是百度好友为我解惑帮忙写的两份聊天软件的其中之一,两个功能是一样的,完全可以实现点对点的聊天,只不过一个是用到了异步另一个是多线程同步(其实我觉得通过多线程控制窗体会使得程序清晰易懂,虽说会...

    leetcode下载-hy_computerAndJava_basic:计算机基础知识知识

    在那个时候基础不好(没学过操作系统),对jvm和多线程只是死记硬背,很多概念根部不理解 只有系统的学习,才能更深的理解 提升 高并发架构(消息队列,搜索引擎,缓存,数据库高级)-&gt;分布式系统-&gt;springcloud微服务 -&gt;k8s ...

    Project

    标题中的"Project"通常指的是一个软件开发项目,这可能是一个包含多个组件的复杂系统,而描述中的"帮帮忙吧,谢谢"则暗示了用户可能在处理这个项目时遇到了问题,需要帮助解决。从标签"1"来看,它可能是对项目阶段、...

    买火车票引发的执行力思考.pdf

    他可以利用多线程策略,邀请同事帮忙排队,提高购票成功率。此外,小王还可以考虑其他交通方式,如飞机、汽车等,灵活应对。 2. **运用人际关系网络**:在社会关系网络中寻求帮助是提升执行力的一种方法。小王可以...

    心蓝12306购票软件2013年1月6日最新版

    这个问题是由于您的证书安装不正确导致,请重新下载证书,并按里面的“SRCA根证书安装说明手册”重新安装即可解决问题。如果还不行,这也是您自己电脑的问题,请联系您的网络管理员或者与我们的客服取得联系以取得...

    易语言NetDB数据库操作中间件

    3、**本次更新包括数据库读写的权限控制,测试环境有限,有问题再改进,这个只是个雏形,后期还会更强。 4、**主要演示文件是服务端和网盘,请详细阅读。 ================ [2020-2-4日] ================ 1、**...

    CoreData:Demo中详细讲解了CoreData的各种操作,并且给出了详细的注释。写技术文章不容易,希望各位能帮忙点个Star,谢谢!

    我在简书的博客中写了一系列总计六篇的CoreData文章,总字数大概3W+,从CoreData的基础使用到使用进阶,再到多线程、版本迁移等高级用法,讲解非常详细。 但CoreData的学习还是应该偏实践,我根据博客中讲到的知识点...

    精易模块[源码] V5.15

    11、修复“线程_等待”命令注释反的问题,返回真表示线程结束,假表示已超时。感谢易友【tone】反馈。 12、修复“类_识图-&gt;找图_从字节集”命令,载入大文件直接奔溃的BUG,感谢易友【tone】反馈。 精易模块 V3.80...

    神枫炼狱新手攻略_免费百度网盘下载提速_炼狱_网盘_

    3. **神枫炼狱新手必看.txt**:这个文本文件极有可能是简洁明了的新手教程,涵盖了一些基本操作、游戏设定和注意事项,对于初入游戏的玩家来说是非常实用的参考资料。 综上所述,这篇内容不仅讲解了如何提升百度...

    IO多路复用.md

    - **多路IO复用**是一种同步IO技术,它允许一个线程通过记录多个IO流的状态来管理多个IO操作。值得注意的是,异步IO必定是非阻塞的,不存在所谓的“异步阻塞”。 #### 多种IO模型示例 - **阻塞IO**:形象地比喻为...

    CodingBeerBar:支持在线悬赏、解答、交流的平台

    1. 提出问题:当遇到编程难题时,用户可以详细描述问题,设定悬赏金额,吸引更多有经验的开发者来帮忙解决。 2. 回答问题:擅长特定领域的用户可以浏览悬赏问题,提供解决方案,获取悬赏奖励,并在解决问题的过程中...

    Java设计模式

    **应用场景**:多线程环境下的资源管理。 ##### 5. **工厂方法模式 (Factory Method Pattern)** 定义一个创建产品对象的接口,但让实现这个接口的类决定具体实例化哪一个类。工厂方法让类的实例化推迟到子类。 **...

    my-python-practices:我的python做法

    my-python-practicessimplemultithreadsCrawler.py:一个简单的多线程生产者消费者爬虫,实际使用的时候可以重写parser的parse_links方法,来写自己的解析规则,然后将解析后将继续要爬的地址放入url队列,生产者会...

    c代码-小游戏,有错,哪里错了呢?

    从标题和描述来看,这似乎是一个遇到了问题的小游戏项目。让我们来探讨一下可能出错的地方,并提供一些排查错误的策略。 首先,让我们分析`main.c`这个文件。它是C程序的主要入口点,包含了程序的主函数`main()`。...

    .NET 分布式组件库 Exceptionless Foundatio.zip

    如果您觉得本文对您有帮助,想让更多人了解Exceptionless,感谢您帮忙点的【推荐】。如果您对 Exceptionless 感兴趣或者是想学习 Exceptionless 的代码,可以加入群Exceptionless QQ群:330316486。 ...

Global site tag (gtag.js) - Google Analytics