论坛首页 入门技术论坛

线程并发问题。每次读取一批数据,并把这些数据分发给内部线程组去处理。

浏览 2227 次
该帖已经被评为新手帖
作者 正文
   发表时间:2007-05-17  
OO
小弟请教各位一个问题:
有一个主线程,主线程里有一个子线程组(大概8个子线程)。逻辑是主线程每一次从数据库里取一批数据,并分发给线程内部的子线程去进行去对外发送。并可以知道发送的次数。在主程在数据里取数据已经完成之后,那就停止线程。

本人已经写了一个代码,不过还是有问题。各位大侠,请。
package com.thread;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.log4j.Logger;

public class MultiThread2 extends Thread {

private static Logger logger = Logger.getLogger(MultiThread2.class);

// 加一些基础信息,让WEB页面可以刷新的时候可以看到发送状态
//
private String requestData = null;

private Object lock;

private Object submitLock;

private Object startLock;

//private Object stopLock;

private InnerThread[] innerThreads;

private boolean tag = true;

public MultiThread2() {
try {
lock = new Object();
submitLock = new Object();
startLock = new Object();
int maxThreadNum = 3;
innerThreads = new InnerThread[maxThreadNum];
} catch (Exception e) {
logger.info("创建线程失败:" + e.getMessage());
}
}

public void startThread() {
synchronized (startLock) {
if (tag) {
for (int i = 0; i < innerThreads.length; i++) {
innerThreads[i] = new InnerThread(this);
innerThreads[i].start();
}
this.start();// 启动主线程
}
}
}
public void stopInnerThread(){
for (int i = 0; i < innerThreads.length; i++) {
innerThreads[i].interrupt();
}
this.interrupt();
}
public void submitRequest(InnerThread innerThread)
throws InterruptedException {
synchronized (submitLock) {
while (tag) {
synchronized (this) {
if (this.requestData != null) {
innerThread.setXmlRequestData(requestData);
this.requestData = null;
this.notify();
break;
} else {
this.wait();
}
}
}
}
}

public void run() {// 主线程
List markList = new ArrayList();
setValue2List(markList);
while (tag) {
// 取出一部分数据提交
logger.info("记录数:" + markList.size());
Iterator itr = markList.iterator();
while (itr.hasNext() && tag) {
String value = (String) itr.next();
itr.remove();
if (value != null) {
try {
// 控制速度
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lock) {
while (tag) {
synchronized (this) {
if (this.requestData == null) {
this.requestData = value;
logger.info("将要发送的值是:"+value);
this.notify();
break;
} else {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}// synchronized(lock)
}// vo != null
}// while(itr.hasNext())
try {
sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
tag = false;
//stopInnerThread();
}
}

private void setValue2List(List markList) {
markList.add("记录1");
markList.add("记录2");
markList.add("记录3");
markList.add("记录4");
markList.add("记录5");
// markList.add("记录6");
// markList.add("记录7");
// markList.add("记录8");
// markList.add("记录9");
// markList.add("记录10");
// markList.add("记录11");
// markList.add("记录12");
// markList.add("记录13");
// markList.add("记录14");
// markList.add("记录15");
}

class InnerThread extends Thread {

private String xmlRequestData = null;

private MultiThread2 multiThread;

public InnerThread(MultiThread2 multiThread) {
this.multiThread = multiThread;
}

public void run() {
while (tag) {
try {
multiThread.submitRequest(this);
} catch (InterruptedException e) {
return;
}
//logger.info("----------->>>>>>>>>>>出现多少次,就发送多少次<<<<<<<<<<<----------");
// int resendCount = 0;
// boolean sendOk = false;
// Random random = new Random(3);
logger.info("发送值是(xmlRequestData) = " + xmlRequestData);
if (xmlRequestData!=null && xmlRequestData.equals("")){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// while (!sendOk && resendCount < 3) {
// int i = random.nextInt();
// if (i == 1) {
// sendOk = true;
// } else {
// resendCount++;
// logger.info("随机数 i = " + i + " 重试次数:" + resendCount
// + " xmlRequestData = "+xmlRequestData);
// }
// }
}
}

public void setXmlRequestData(String xmlRequestData) {
this.xmlRequestData = xmlRequestData;
}
}

public static void main(String[] args) {
MultiThread2 t = new MultiThread2();
t.startThread();
}

public boolean isTag() {
return tag;
}

public void setTag(boolean tag) {
this.tag = tag;
}
}
论坛首页 入门技术版

跳转论坛:
Global site tag (gtag.js) - Google Analytics