`
fx05062219
  • 浏览: 19907 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

Java Concurrent Control Thread Number

    博客分类:
  • java
 
阅读更多
http://technicalmumbojumbo.wordpress.com/2010/02/21/java-util-concurrent-java-5-semaphore/

java.util.concurrent: Java 5 Semaphore

Posted on February 21, 2010 | 4 Comments
This post is a continuation in the series on Java 5 concurrency utilities. My previous posts dealt with Atomic classes and Locks. This one will focus on the Semaphore class.


The sole purpose of the Semaphore class is to limit the amount of concurrent access. Consider one has a resource intensive component; a business component which requires lot of memory for computational processing. We cannot have a large number of consumers concurrently accessing this component. Given the finite size of memory, uncontrolled concurrent access would lead to memory contention and inevitable slowdown. To overcome this challenge Java 5 provides the Semaphore class. To use the Semaphore, client program needs to instantiate the Semaphore instance and specify the maximum number of threads which can concurrently access the resources guarded by the Semaphore instance. The value limit is termed as “permits”.

For a consumer thread to procure a permit from a semaphore, the consumer thread is provided with two options. Invoke  acquire or tryAcquire method on the Semaphore instance. The acquire method is a blocking thread which will block until a permit is made available to it. On the other hand tryAcquire will attempt only once to acquire a permit; if unsuccessful it will gracefully exit without blocking. Both the methods have overloaded versions which allow them to specify the number of permits they want to acquire. The tryAcquire method has overloaded versions which allow the consumer to wait for a permit(s) acquisition for a pre-defined period of time. To release a permit or permits back to the Semaphore instance, the consumer needs to invoke release or its overloaded cousins. The key point to note here is that the Semaphore instance does not maintain any record of which threads acquired permits and which ones are releasing permits. It is the responsibility of the application developer to maintain conformity if desired. This  facility is a plus or a minus from an end user’s viewpoint. I would have preferred if the Semaphore class maintained such history. But that was not to be.

Anyways enough theory, let’s look at how we can go about implementing a Semaphore. Consider the following class ComplexBusinessProcess.

package com.test.concurrency.semaphore;

import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.concurrent.Semaphore;

public class ComplexBusinessProcess {

	private static final int MAX_THREADS = 3;

	private static Semaphore accessControl = new Semaphore(MAX_THREADS);

	private String name = null;

	public ComplexBusinessProcess(String name) {
		this.name = name;
	}

	public void doSomething() {
		//Does some processing which is extremely complex and
		// memory intensive. Need to limit access to a specific no.
		// of threads

		//Check if permit is available
		if (this.acquireAccess()) {
			printMessage("Complex processing started for " + this.name + ".");
		}

		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		this.releaseAccess();
		printMessage("Processing completed by " + this.name + ".");

	}

	public void doSomethingConditionally() {
		//Does some processing which is extremely complex and
		// memory intensive. Need to limit access to a specific no.
		// of threads

		//Check if permit is available
		if (this.attemptAccess()) {
			printMessage("Complex processing started for " + this.name + ".");
		} else {
			printMessage("Complex processing not initiated for " + this.name + ".");
			return;
		}

		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		this.releaseAccess();
		printMessage("Processing completed by " + this.name + ".");

	}

	private static void printMessage(String message) {
		DateFormat df = new SimpleDateFormat("HH:mm:ss");
		System.out.println(df.format(new java.util.Date()) + " " + message);
	}

	private boolean acquireAccess() {
		try {
			accessControl.acquire();
		} catch (InterruptedException e) {
			e.printStackTrace();
			throw new RuntimeException("Unable to acquire Semaphore.", e);
		}
		return true;
	}

	private boolean attemptAccess() {
		return accessControl.tryAcquire();
	}

	public boolean releaseAccess() {
		accessControl.release();
		return true;
	}

}
The ComplexBusinessProcess class models a resource intensive business process. It instantiates a Semaphore instance variable named accessControl. The instance is constructed with limit of three permits. The acquisition and release of permit is facilitated via three methods namely acquireAccess, attemptAccess and releaseAccess. Two business methods doSomething and doSomethingConditional are used to replicate the behaviour of complex processing. Currently these methods simply cause the prevailing thread to sleep for 5 seconds. The doSomething method tries the blocking acquisition and doSomethingConditional method tries the nonblocking acquisition route.

The functionality can be tested using the following test classes. SemaphoreTest and Processor is used for testing the doSomething method implementation and ConditionalSemaphoreTest and ConditionalProcessor for testing the doSomethingConditional method implementation. Please find below the source code.

package com.test.concurrency.semaphore;

public class Processor implements Runnable {

	private String threadName = null;

	public Processor(String name) {
		this.threadName = name;
	}

	public void run() {
		ComplexBusinessProcess busProcess = new ComplexBusinessProcess(this.threadName);
		busProcess.doSomething();
	}

}

package com.test.concurrency.semaphore;

public class SemaphoreTest {

	public static void main(String[] args) {
		for(int i=1; i<5; i++) {
			Processor processor = new Processor("THREAD-" + i);
			new Thread(processor).start();
		}
	}

}

package com.test.concurrency.semaphore;

public class ConditionalProcessor implements Runnable {

	private String threadName = null;

	public ConditionalProcessor(String name) {
		this.threadName = name;
	}

	public void run() {
		ComplexBusinessProcess busProcess = new ComplexBusinessProcess(this.threadName);
		busProcess.doSomethingConditionally();
	}

}

package com.test.concurrency.semaphore;

public class ConditionalSemaphoreTest {

	public static void main(String[] args) {
		for(int i=1; i<5; i++) {
			ConditionalProcessor processor = new ConditionalProcessor("THREAD-" + i);
			new Thread(processor).start();
		}
	}

}
The test client code is self-explanatory.

Console output for SemaphoreTest:

12:14:21 Complex processing started for THREAD-4.
12:14:21 Complex processing started for THREAD-1.
12:14:21 Complex processing started for THREAD-2.
12:14:26 Processing completed by THREAD-2.
12:14:26 Complex processing started for THREAD-3.
12:14:26 Processing completed by THREAD-1.
12:14:26 Processing completed by THREAD-4.
12:14:31 Processing completed by THREAD-3.
The test client code tries to run 4 threads. 3 threads pass thru successfully and the fourth is blocked due to non-availiability of permits. On completion of one of the initial threads in our case Thread-2, the fourth thread is allowed to process.

Console ouput for ConditionalSemaphoreTest:

12:16:45 Complex processing started for THREAD-4.
12:16:45 Complex processing started for THREAD-2.
12:16:45 Complex processing not initiated for THREAD-3.
12:16:45 Complex processing started for THREAD-1.
12:16:50 Processing completed by THREAD-1.
12:16:50 Processing completed by THREAD-2.
12:16:50 Processing completed by THREAD-4.
分享到:
评论

相关推荐

    Managed IO Completion Ports.docx

    Use synchronization mechanisms like semaphores or events to control the flow of work items and manage the number of concurrent I/O operations. 3. Inside Managed IOCP Managed IOCP internally uses the...

    UNIX Network Programming Volume 1, Third Edition (Unix网络编程卷1第3版英文版)

    Controlling the Number of Streams Section 10.7. Controlling Termination Section 10.8. Summary Exercises Chapter 11. Name and Address Conversions Section 11.1. Introduction Section 11.2....

    AkkaScala.pdf

    **Akka** is a toolkit and runtime designed for building highly concurrent, distributed, and fault-tolerant systems on the Java Virtual Machine (JVM). It leverages the actor model of computation, ...

    Efficient MIDP Programming

    Multi-threading can improve execution speed by allowing concurrent execution of tasks. However, it also introduces complexity and potential synchronization issues. Best practices for multi-threading ...

    CUDA Reference from NVIDIA official site.

    Streams in CUDA allow for concurrent kernel execution and memory transfers, which can significantly improve performance by overlapping computations and data transfers. **1.3.2 cudaStreamQuery** `...

    acpi控制笔记本风扇转速

    concurrent control method execution. An additional namespace race condition is known to exist between AcpiWalkNamespace and the Load/Unload ASL operators and is still under investigation. ...

    python3.6.5参考手册 chm

    concurrent.futures contextlib datetime decimal distutils email encodings enum faulthandler fileinput hashlib http.client idlelib and IDLE importlib inspect json logging math ...

    微软内部资料-SQL性能优化3

    Page (6) Data 1: Page Number; Data 3: 0. Key (7) Data 1: Object ID; Data 2: Index ID; Data 3: Hashed Key Extent (8) Data 1: Extent ID; Data 3: 0. RID (9) Data 1: RID; Data 3: 0. Application (10) Data ...

    深入理解计算机系统(英文版)

    2.2.5 Expanding the Bit Representation of a Number . . . . . . . . . . . . . . . . . . . . 49 2.2.6 TruncatingNumbers . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 51 2.2.7 ...

Global site tag (gtag.js) - Google Analytics