在人人网海量存储系统的存储引擎部分,为了提高CPU和网络的使用情况,使用了java多线程管理并行操作的方式。
在java中控制线程是一件很简单的事情,jdk提供了诸多的方法,其中比常用的两个是notify()和wait(),一个是唤醒,一个等待线程,在下面的代码中,将看到一个线程分配器,根据cpu的负载情况,自动完成对应线程的唤醒或者是等待操作。整个过程是一个平滑的过程,不会因为线程的切换而导致机器负载出线锯齿。
先看一个类,读取Linux系统TOP等指令拿到系统当前负载:
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* 节点的cpu 内存 磁盘空间 情况
*
* @author zhen.chen
*
*/
public class NodeLoadView {
/**
* 获取cpu使用情况
*
* @return
* @throws Exception
*/
public double getCpuUsage() throws Exception {
double cpuUsed = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“/usr/bin/uptime”);// 调用系统的“top”命令
String[] strArray = null;
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
while ((str = in.readLine()) != null) {
strArray = str.split(“load average: “);
strArray = strArray[1].split(“,”);
cpuUsed = Double.parseDouble(strArray[0]);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return cpuUsed;
}
/**
* 内存监控
*
* @return
* @throws Exception
*/
public double getMemUsage() throws Exception {
double menUsed = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“top -b -n 1″);// 调用系统的“top”命令
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
String[] strArray = null;
while ((str = in.readLine()) != null) {
int m = 0;
if (str.indexOf(” R “) != -1) {// 只分析正在运行的进程,top进程本身除外 &&
//
// System.out.println(“——————3—————–”);
strArray = str.split(” “);
for (String tmp : strArray) {
if (tmp.trim().length() == 0)
continue;
if (++m == 10) {
// 9)–第10列为mem的使用百分比(RedHat 9)
menUsed += Double.parseDouble(tmp);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return menUsed;
}
/**
* 获取磁盘空间大小
*
* @return
* @throws Exception
*/
public double getDeskUsage() throws Exception {
double totalHD = 0;
double usedHD = 0;
Runtime rt = Runtime.getRuntime();
Process p = rt.exec(“df -hl”);// df -hl 查看硬盘空间
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(p.getInputStream()));
String str = null;
String[] strArray = null;
while ((str = in.readLine()) != null) {
int m = 0;
// if (flag > 0) {
// flag++;
strArray = str.split(” “);
for (String tmp : strArray) {
if (tmp.trim().length() == 0)
continue;
++m;
// System.out.println(“—-tmp—-” + tmp);
if (tmp.indexOf(“G”) != -1) {
if (m == 2) {
// System.out.println(“—G—-” + tmp);
if (!tmp.equals(“”) && !tmp.equals(“0″))
totalHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1)) * 1024;
}
if (m == 3) {
// System.out.println(“—G—-” + tmp);
if (!tmp.equals(“none”) && !tmp.equals(“0″))
usedHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1)) * 1024;
}
}
if (tmp.indexOf(“M”) != -1) {
if (m == 2) {
// System.out.println(“—M—” + tmp);
if (!tmp.equals(“”) && !tmp.equals(“0″))
totalHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1));
}
if (m == 3) {
// System.out.println(“—M—” + tmp);
if (!tmp.equals(“none”) && !tmp.equals(“0″))
usedHD += Double.parseDouble(tmp.substring(0,
tmp.length() – 1));
// System.out.println(“—-3—-” + usedHD);
}
}
}
// }
}
} catch (Exception e) {
e.printStackTrace();
} finally {
in.close();
}
return (usedHD / totalHD) * 100;
}
//
// public static void main(String[] args) throws Exception {
// NodeLoadView cpu = new NodeLoadView();
// System.out
// .println(“—————cpu used:” + cpu.getCpuUsage() + “%”);
// System.out
// .println(“—————mem used:” + cpu.getMemUsage() + “%”);
// System.out
// .println(“—————HD used:” + cpu.getDeskUsage() + “%”);
// System.out.println(“————jvm监控———————-”);
// Runtime lRuntime = Runtime.getRuntime();
// System.out.println(“————–Free Momery:” + lRuntime.freeMemory()
// + “K”);
// System.out.println(“————–Max Momery:” + lRuntime.maxMemory()
// + “K”);
// System.out.println(“————–Total Momery:”
// + lRuntime.totalMemory() + “K”);
// System.out.println(“—————Available Processors :”
// + lRuntime.availableProcessors());
// }
}
再来看关键的一个类,THreadScheduler:
import java.util.Map;
import org.apache.log4j.Logger;
import test.NodeLoadView;
public class ThreadScheduler {
private static Logger logger = Logger.getLogger(ThreadScheduler.class.getName());
private Map<String, Thread> runningThreadMap;
private Map<String, Thread> waitingThreadMap;
private boolean isFinished = false;
private int runningSize;
public ThreadScheduler (Map<String, Thread> runningThreadMap, Map<String, Thread> waitingThreadMap) {
this.runningThreadMap = runningThreadMap;
this.waitingThreadMap = waitingThreadMap;
this.runningSize = waitingThreadMap.size();
}
/**
* 开始调度线程
* @author zhen.chen
* @createTime 2010-1-28 上午11:04:52
*/
public void schedule(){
long sleepMilliSecond = 1 * 1000;
int allowRunThreads = 15;
// 一次启动的线程数,cpuLoad变大时以此值为参考递减
int allowRunThreadsRef = 15;
double cpuLoad = 0;// 0-15
NodeLoadView load = new NodeLoadView();
while (true) {
try {
cpuLoad = load.getCpuUsage();
} catch (Exception e1) {
e1.printStackTrace();
}
// cpuLoad低 启动的线程多
allowRunThreads = (int) Math.floor(allowRunThreadsRef – cpuLoad);
// threads不能为0
if (allowRunThreads < 1) {
allowRunThreads = 1;
}
if (allowRunThreads > allowRunThreadsRef) {
allowRunThreads = allowRunThreadsRef;
}
if (logger.isDebugEnabled()) {
logger.debug(“[ThreadScheduler]running Thread:” + runningThreadMap.size() + “; waiting Thread:” + waitingThreadMap.size() + “; cpu:” + cpuLoad + ” allowRunThreads:” + allowRunThreads);
}
// 检查runningSize个线程的情况,满足条件则启动
for (int x = 0; x < runningSize; x++) {
if (waitingThreadMap.get(x+”") != null) {
if (allowRunThreadsRef <= runningThreadMap.size()) {
break;
}
synchronized (waitingThreadMap.get(x+”")) {
if (!waitingThreadMap.get(x+”").isAlive()) {
waitingThreadMap.get(x+”").start();
}else{
waitingThreadMap.get(x+”").notify();
}
}
runningThreadMap.put(x+”", waitingThreadMap.get(x+”"));
waitingThreadMap.remove(x+”");
}
}
// 检查runningSize个线程的情况,满足条件则暂停
for (int x = 0; x < runningSize; x++) {
if (runningThreadMap.size() <= allowRunThreads) {
break;
}
if (runningThreadMap.get(x+”") != null) {
synchronized (runningThreadMap.get(x+”")) {
try {
if (runningThreadMap.get(x+”").isAlive()) {
runningThreadMap.get(x+”").wait();
}else{
continue;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
waitingThreadMap.put(x+”", runningThreadMap.get(x));
runningThreadMap.remove(x+”");
}
}
// 全部跑完,返回
if (waitingThreadMap.size() == 0 && runningThreadMap.size() == 0) {
if (logger.isDebugEnabled()) {
logger.debug(“[ThreadScheduler] over.total Threads size:” + runningSize);
}
this.isFinished = true;
return;
}
// 使主while循环慢一点
try {
Thread.sleep(sleepMilliSecond);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
public boolean isFinished() {
return isFinished;
}
}
这个类的作用:
1.接收runningThreadMap和waitingThreadMap两个map,里面对应存了运行中的线程实例和等待中的线程实例。
2.读cpu情况,自动判断要notify等待中的线程还是wait运行中的线程。
3.两个map都结束,退出。(必须runningThreadMap内部的Thread自己将runningThreadMap对应的Thread remove掉)
如何使用:
public class TestThread {
public static class Runner extends Thread {
public Runner(int j, Map<String, Thread> threadMap) {
}
public void run() {
// TODO 你的逻辑 完成后需要从threadMap中remove掉
}
}
public static void main(String[] args) {
// 运行中的线程
Map<String, Thread> threadMap = new HashMap<String, Thread>();
// 正在等待中的线程
Map<String, Thread> waitThreadMap = new HashMap<String, Thread>();
for (int j = 0; j < args.length; j++) {
Thread t = new Runner(j, threadMap);
waitThreadMap.put(j + “”, t);
}
ThreadScheduler threadScheduler = new ThreadScheduler(threadMap, waitThreadMap);
threadScheduler.schedule();
if (threadScheduler.isFinished() == false) {
//没能正常结束
}
}
}
转载自五四陈科学院[http://www.54chen.com]
分享到:
相关推荐
- JUC包含了许多高级并发组件,如`Semaphore`信号量、`CyclicBarrier`回环栅栏、`CountDownLatch`倒计时器和`Exchanger`交换器等,它们可以帮助控制线程的执行顺序。 - `Semaphore`可以限制同时访问特定资源的线程...
- Java线程有10个优先级,`Thread.MIN_PRIORITY`(1)、`Thread.NORM_PRIORITY`(5)和`Thread.MAX_PRIORITY`(10),但实际操作系统的调度并不完全依赖优先级。 7. **守护线程(Daemon)**: - 守护线程不阻碍...
### Java线程培训资料知识点详解 #### 一、Java线程基本概念 1. **如何编写与启动线程** - **方式一:继承Thread类** ```java class MyThread extends Thread { @Override public void run() { // 业务逻辑 ...
- Java线程的状态包括新建(New)、就绪(Runnable)、运行(Running)、阻塞(Blocked)、等待(Waiting)、时间等待(Timed Waiting)和终止(Terminated)。 - 线程可以通过调用特定的方法改变其状态,例如 `start()`、`sleep()...
Java线程的状态包括新建 (`NEW`)、就绪 (`RUNNABLE`)、阻塞 (`BLOCKED`)、等待 (`WAITING`)、超时等待 (`TIMED_WAITING`) 和终止 (`TERMINATED`)。 #### 五、Java线程:线程的同步与锁 - **线程同步** 线程同步...
Java多线程是Java编程中的重要概念,它允许程序同时执行多个任务,提高了程序的效率和响应性。本文将深入探讨Java多线程中的关键知识点,包括创建线程、主线程、线程优先级、线程组、线程同步以及线程间的通信。 1....
Java线程是Java编程语言中的核心概念,尤其在多任务处理和并发编程中扮演着重要角色。线程允许一个程序内部同时执行多个独立的控制流,使得程序能够更高效地利用处理器资源。本文将深入解析Java线程的相关知识点,...
在Java编程环境中,单线程指的是程序执行过程中只有一个线程在运行。这意味着任何时刻只能执行一个任务,上一个任务完成后才会进行下一个任务。单线程模型简化了程序设计,降低了程序复杂度,使得开发者可以更专注于...
在Java编程语言中,线程是程序执行的基本单元,它允许程序并发地执行多个任务。在多线程环境中,程序的...在编写`test`这样的示例时,你可以创建并启动线程,测试各种线程控制和同步方法,从而加深对Java线程的理解。
Java线程有10个优先级(MIN_PRIORITY, NORM_PRIORITY, MAX_PRIORITY),默认优先级是NORM_PRIORITY。但是,线程优先级并不保证绝对的执行顺序,操作系统调度策略可能影响实际执行顺序。 7. join()方法: 一个线程...
Java多线程是Java编程语言中一个非常重要的概念,它允许开发者在一个程序中创建多个执行线程并行运行,以提高程序的执行效率和响应速度。在Java中,线程的生命周期包含五个基本状态,分别是新建状态(New)、就绪...
Java线程是Java编程中的重要概念,它允许程序同时执行多个任务,从而提高了程序的效率和响应性。在Java中,线程是程序执行的最小单位,由Java虚拟机(JVM)管理。本教程将深入讲解Java线程的基础知识,帮助你掌握多...
Java线程是多任务编程的重要..."简单的Java线程demo"可能包含以上提到的一些示例代码,通过实践运行和调试这些代码,可以帮助我们更好地理解和运用Java线程。记得在学习过程中,不断地进行实验和测试,以便加深理解。
Java线程的状态可以分为新建、就绪、运行、阻塞和死亡五个阶段。 - **新建状态**:当使用`new`关键字创建一个线程对象后,该线程处于新建状态。 - **就绪状态**:调用`start()`方法后,线程进入就绪状态等待CPU时间...
Java多线程程序设计是Java开发中的重要概念,它允许在一个程序中同时执行多个任务,提高了应用程序的效率和响应性。在Java中,有两种主要的方式来实现多线程:继承Thread类和实现Runnable接口。 首先,我们要理解多...
根据提供的信息,我们可以深入探讨Java线程同步以及代码示例中的关键知识点。 ### Java线程同步实例:哲学家就餐问题 #### 1. 哲学家就餐问题简介 哲学家就餐问题是多线程编程中一个经典的同步问题。该问题描述为...
### Java多线程知识点解析 #### 一、Java多线程概述 Java作为一种现代编程语言,内置了...通过以上分析,我们不仅了解了Java多线程的基本概念和技术细节,还通过具体的代码示例深入了解了多线程在实际编程中的应用。
本文档主要介绍了Java中的多线程编程,包括其基础概念、实现方法、线程控制等方面。 **基本概念** - **进程**:是指计算机正在执行的一个程序实例,比如一个`.class`文件或`.exe`文件。 - **线程**:是进程中的一...
4. **线程状态**:Java线程有五种状态:新建、就绪、运行、阻塞和终止。了解这些状态有助于理解线程的生命周期。 5. **同步机制**:Java提供了多种同步机制防止数据竞争,如`synchronized`关键字、`wait()`, `...
4. **线程状态**:Java线程有新建、就绪、运行、阻塞和死亡五种状态,可以通过`getState()`获取。 三、线程池与Executor框架 1. **ExecutorService**:Java 5引入了`ExecutorService`接口,它是线程池的抽象,通过...