`

同步与并发demo:增加吞吐量

 
阅读更多
package test.lock;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestConcurrent {
	private static final int LOCK_SIZE = 10;
	private static final Lock[] locks = new ReentrantLock[10];
	private static final Map<String, List<String>> dataMap ;
	static {
		dataMap = new ConcurrentHashMap<String, List<String>>();
		for (int i = 0; i < LOCK_SIZE; i++) {
			locks[i] = new ReentrantLock();
		}
	}

	public int count(String userid) {
		final Lock lock = locks[userid.hashCode() & (LOCK_SIZE - 1)];
		lock.lock();
		try {
			List<String> list = null;
			list = dataMap.get(userid);
			if (list != null) {
				System.out.println(Thread.currentThread().getName()
						+ ",userid:" + userid + ",get from cache");
			} else {
				System.out.println(Thread.currentThread().getName()
						+ ",userid:" + userid + ",get from db");
				list = loadData(userid);
				dataMap.put(userid, list);
			}
			return list != null ? list.size() : 0;
		} finally {
			lock.unlock();
		}
	}

	private List<String> loadData(String userid) {
		// 模拟耗时操作
		// sleep 1s
		sleeper(1000);
		Random r = new Random();
		List<String> list = new ArrayList<String>();
		int size = r.nextInt(1000);
		for (int i = 0; i < size; i++) {
			list.add(i + "");
		}
		return list;
	}

	private void sleeper(int time) {
		try {
			Thread.sleep(time);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		final TestConcurrent t = new TestConcurrent();
		ExecutorService executor = Executors.newCachedThreadPool();
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(20 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(99 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(55 + "")));
		executor.submit(new Task(t, new String(22 + "")));
		executor.submit(new Task(t, new String(44 + "")));
		executor.submit(new Task(t, new String(11 + "")));
		executor.submit(new Task(t, new String(44 + "")));
		executor.submit(new Task(t, new String(57 + "")));
		executor.submit(new Task(t, new String(78 + "")));
		executor.submit(new Task(t, new String(54 + "")));
		executor.submit(new Task(t, new String(123 + "")));
		executor.submit(new Task(t, new String(452 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
		executor.submit(new Task(t, new String(88 + "")));
	}

	private static class Task implements Runnable {
		private TestConcurrent t;
		private String userid;

		public Task(TestConcurrent t, String userid) {
			this.t = t;
			this.userid = userid;
		}

		@Override
		public void run() {
			int result = t.count(userid);
			// TODO
		}
	}
}

 

输出结果:
pool-1-thread-1,userid:20,get from db
pool-1-thread-8,userid:88,get from db
pool-1-thread-24,userid:452,get from db
pool-1-thread-22,userid:54,get from db
pool-1-thread-3,userid:20,get from cache
pool-1-thread-10,userid:88,get from cache
pool-1-thread-2,userid:20,get from cache
pool-1-thread-12,userid:88,get from cache
pool-1-thread-6,userid:20,get from cache
pool-1-thread-5,userid:20,get from cache
pool-1-thread-14,userid:88,get from cache
pool-1-thread-4,userid:20,get from cache
pool-1-thread-16,userid:22,get from db
pool-1-thread-21,userid:78,get from db
pool-1-thread-18,userid:11,get from db
pool-1-thread-7,userid:88,get from cache
pool-1-thread-20,userid:57,get from db
pool-1-thread-9,userid:88,get from cache
pool-1-thread-11,userid:88,get from cache
pool-1-thread-13,userid:99,get from db
pool-1-thread-26,userid:88,get from cache
pool-1-thread-15,userid:55,get from db
pool-1-thread-28,userid:88,get from cache
pool-1-thread-17,userid:44,get from db
pool-1-thread-19,userid:44,get from cache
pool-1-thread-23,userid:123,get from db
pool-1-thread-27,userid:88,get from cache
pool-1-thread-25,userid:88,get from cache
pool-1-thread-29,userid:88,get from cache

 

分享到:
评论

相关推荐

    并发编程demo

    Disruptor使用环形缓冲区(Ring Buffer)作为数据结构,并通过序列化和事件处理器链路实现低延迟、高吞吐量的并发处理。在`DisruptorSrcTest`中,你可以看到Disruptor的源码分析和测试用例,这对于理解其工作原理和...

    JavaDemo:一天一天

    Disruptor是LMAX公司开发的一款高性能的并发框架,主要用于低延迟、高吞吐量的消息传递。它通过使用环形缓冲区和基于事件的处理模型,避免了传统同步机制的性能瓶颈。在JavaDemo中,你可以学习如何使用Disruptor来...

    disruptor-demo:希望想使用disruptor的同学可以尽快上手

    Disruptor是由LMAX公司开发并开源的一款高性能、低延迟的并发工具库,它主要应用于高吞吐量、低延迟的系统中,如金融交易系统。Disruptor的设计理念是通过减少线程间的共享数据交互来提高并发性能,从而达到优化多...

    DisruptorDemo.zip

    总结起来,Disruptor是通过创新的并发模型,优化了多线程间的通信,降低了锁竞争,提升了系统吞吐量。"DisruptorDemo.zip"的实例代码为我们提供了学习和理解Disruptor的一个良好起点,通过实际操作,我们可以更直观...

    DEMO.rar_DEMO

    通过这种方式,服务器可以同时处理多个客户端的请求,提高了系统的吞吐量。 总结一下,"DEMO.rar_DEMO"中的"DEMO-多循环应用程序构架"可能是一个简单的生产者消费者模型的实现,利用循环来持续监控和执行生产或消费...

    24L01收发一体多线程测试demo

    2. 并发性能:评估在高并发情况下,24L01和多线程程序的处理能力,如吞吐量、延迟等性能指标。 3. 功耗分析:测试多线程工作模式对24L01的功耗影响,尤其是在电池供电的应用场景下。 4. 错误处理:确保程序在异常...

    Disruptor demo

    Disruptor的设计理念是避免传统的锁机制,转而采用一种称为“环形缓冲区”的数据结构,以及基于事件的处理模型,从而实现低延迟、高吞吐量的并发编程。 在"Disruptor demo"中,我们可以看到如何使用Disruptor来实现...

    tcp网络通讯协议demo

    7. **性能优化**:在实际应用中,为了提高效率,可能会采用NIO(Non-blocking I/O)或AIO(Asynchronous I/O)来替代传统的IO模型,减少阻塞,提升系统吞吐量。 8. **TCP状态机**:TCP连接有多种状态,如CLOSED、...

    c#IPCO多线程并发业务处理

    异步I/O允许程序在等待I/O操作完成时执行其他任务,从而提高整体吞吐量。C#中的async/await关键字和Task类提供了异步编程的支持。 压缩包中的"TCPSOCKET.IPCO"可能是实现这些功能的源代码文件,包含了相关的类和...

    IOCP 的DEMO VC源码

    它将I/O操作与通知机制解耦,允许应用程序预分配线程池来处理完成的I/O操作,从而提高了系统的响应速度和吞吐量。 **IOCP的核心组件:** 1. **完成端口对象**:这是IOCP的中心,所有I/O操作都通过这个对象来完成。 ...

    IOCPDemo_NET_V1 iocp 完成端口 demo

    IOCPDemo_NET_V1 是一个.NET实现的完成端口示例,它展示了如何利用完成端口来提高并发性和系统吞吐量。 在传统的I/O模型中,如同步I/O或异步I/O,线程会阻塞等待I/O操作完成,这在处理大量并发请求时可能导致资源...

    kafka demo 两种线程消费方式

    而如果希望在一个实例内部最大化吞吐量,减少网络通信开销,那么“一个Consumer且有多个线程消费”会是更好的选择。 总的来说,这个Kafka Demo项目为学习和实践Kafka提供了很好的起点,涵盖了消费者模型、线程并行...

    IOCP_资料_demo_牧马客_全集

    标题中的“IOCP_资料_demo_牧马客_全集”表明这是一份关于IOCP(I/O完成端口)技术的资料集合,由“牧马客”提供,并且...通过理解和熟练掌握IOCP,开发者可以设计出更高效的网络服务,提高系统的吞吐量和响应速度。

    LMAX disruptor jar包+Demo+Api+src源码 disruptor-3.0.1.jar

    Disruptor的设计目标是解决多线程环境下的数据共享问题,通过优化并发性能,实现极低的延迟和高吞吐量。在Java社区中,Disruptor因其高效的数据交换机制而备受赞誉。 1. **Disruptor的核心概念** - **Ring Buffer*...

    VC的IOCP开发,iocp类,demo

    在Windows操作系统中,I/O完成端口(Input/Output Completion Port, IOCP)是一种高效的多线程并发处理I/O操作的机制,尤其适用于高吞吐量的网络服务器开发。在这个"VC的IOCP开发,iocp类,demo"中,我们将深入探讨...

    ioking无锁线程通讯(no-lockl)demo

    据描述,该系统能够达到每秒处理1450万笔消息,这表明其设计具有极高的并发能力和吞吐量。 压缩包内的两个可执行文件"MsgEngineDemo(ioking no-lock)(64).exe"和"MsgEngineDemo(ioking no-lock)(32).exe"分别是适用...

    disrupter的使用简单demo

    通过消除锁和最小化内存屏障,Disruptor能够实现极低的延迟和高吞吐量。这使得Disruptor成为金融交易、实时分析和其他对性能要求苛刻的领域的理想选择。在实际应用中,可以根据需求调整缓冲区大小、选择合适的等待...

    springboot 同步解耦 异步化

    5. **WebFlux**:SpringBoot 2.x引入了WebFlux,它支持反应式编程模型,可以在Web层实现异步处理,提高服务器的吞吐量。 在压缩包文件`showandshare.demo.syncevnet`中,可能包含了具体的示例代码或者教程,这些...

    subIPC-multicore

    在多核环境下,subIPC能帮助开发者将工作负载分布到不同的核心上,实现负载均衡,降低单个核心的负担,提高系统吞吐量。 三、在多个CPU ID上的应用 在“subIPC-multicore”场景下,每个CPU核心可能运行一个或多...

    read_write_separation_demo.zip

    这样,主库可以专注于处理较少但关键的写操作,而从库则可以处理大量的读请求,提高了系统的整体吞吐量。 接下来,我们要关注的是压缩包中的"DataSourceAop"。AOP(Aspect Oriented Programming,面向切面编程)是...

Global site tag (gtag.js) - Google Analytics